Best Python code snippet using SeleniumBase
twilltestcase.py
Source:twilltestcase.py
...58 pass59 def act_on_multiple_datasets( self, cntrller, library_id, do_action, ldda_ids='', strings_displayed=[] ):60 # Can't use the ~/library_admin/libraries form as twill barfs on it so we'll simulate the form submission61 # by going directly to the form action62 self.visit_url( '%s/library_common/act_on_multiple_datasets?cntrller=%s&library_id=%s&ldda_ids=%s&do_action=%s' %63 ( self.url, cntrller, library_id, ldda_ids, do_action ) )64 for check_str in strings_displayed:65 self.check_page_for_string( check_str )66 def add_bar_codes( self, cntrller, request_id, bar_codes, strings_displayed=[], strings_displayed_after_submit=[] ):67 url = "%s/requests_common/edit_samples?cntrller=%s&id=%s" % ( self.url, cntrller, request_id )68 self.visit_url( url )69 for check_str in strings_displayed:70 self.check_page_for_string( check_str )71 for sample_index, bar_code in enumerate( bar_codes ):72 tc.fv( "1", "sample_%i_bar_code" % sample_index, bar_code )73 tc.submit( "save_samples_button" )74 for check_str in strings_displayed_after_submit:75 self.check_page_for_string( check_str )76 def add_datasets_to_sample( self, request_id, sample_id, external_service_id, sample_datasets, strings_displayed=[], strings_displayed_after_submit=[] ):77 # visit the dataset selection page78 url = "%s/requests_admin/select_datasets_to_transfer?cntrller=requests_admin&sample_id=%s&request_id=%s&external_service_id=%s" % \79 ( self.url, sample_id, request_id, external_service_id )80 self.visit_url( url )81 for check_str in strings_displayed:82 self.check_page_for_string( check_str )83 # Datasets are associated with the given by the building the appropriate url84 # and calling it as the dataset selection UI is a javascript dynatree85 url = "%s/requests_admin/select_datasets_to_transfer?cntrller=requests_admin&sample_id=%s&request_id=%s" % ( self.url, sample_id, request_id )86 url += '&select_datasets_to_transfer_button=Select%20datasets'87 url += '&selected_datasets_to_transfer=%s' % ','.join( sample_datasets )88 self.visit_url( url )89 for check_str in strings_displayed_after_submit:90 self.check_page_for_string( check_str )91 def add_folder( self, cntrller, library_id, folder_id, name='Folder One', description='This is Folder One' ):92 """Create a new folder"""93 url = "%s/library_common/create_folder?cntrller=%s&library_id=%s&parent_id=%s" % ( self.url, cntrller, library_id, folder_id )94 self.visit_url( url )95 self.check_page_for_string( 'Create a new folder' )96 tc.fv( "1", "name", name )97 tc.fv( "1", "description", description )98 tc.submit( "new_folder_button" )99 check_str = escape( "The new folder named '%s' has been added to the data library." % name )100 self.check_page_for_string( check_str )101 def add_samples( self, cntrller, request_id, sample_value_tuples, folder_options=[], strings_displayed=[], strings_displayed_after_submit=[] ):102 url = "%s/requests_common/add_sample?cntrller=%s&request_id=%s&add_sample_button=Add+sample" % ( self.url, cntrller, request_id )103 self.visit_url( url )104 for check_str in strings_displayed:105 self.check_page_for_string( check_str )106 for sample_index, ( sample_name, target_library_info, sample_field_values ) in enumerate( sample_value_tuples ):107 tc.fv( "add_samples", "sample_%i_name" % sample_index, sample_name )108 if target_library_info[ 'library' ] is not None:109 tc.fv( "add_samples", "sample_%i_library_id" % sample_index, target_library_info[ 'library' ] )110 self.refresh_form( "sample_%i_library_id" % sample_index, target_library_info[ 'library' ] )111 # check if the folder selectfield has been correctly populated112 for check_str in folder_options:113 self.check_page_for_string( check_str )114 if target_library_info[ 'folder' ] is not None:115 tc.fv( "add_samples", "sample_%i_folder_id" % sample_index, target_library_info[ 'folder' ] )116 for field_index, field_value in enumerate( sample_field_values ):117 tc.fv( "add_samples", "sample_%i_field_%i" % ( sample_index, field_index ), field_value )118 # Do not click on Add sample button when all the sample have been added119 if sample_index < len( sample_value_tuples ) - 1:120 tc.submit( "add_sample_button" )121 # select the correct form before submitting it122 tc.fv( "add_samples", "copy_sample_index", "-1" )123 tc.submit( "save_samples_button" )124 for check_str in strings_displayed_after_submit:125 self.check_page_for_string( check_str )126 def add_tag( self, item_id, item_class, context, new_tag ):127 self.visit_url( "%s/tag/add_tag_async?item_id=%s&item_class=%s&context=%s&new_tag=%s" %128 ( self.url, item_id, item_class, context, new_tag ) )129 def add_template( self, cntrller, item_type, form_type, form_id, form_name,130 library_id=None, folder_id=None, ldda_id=None, request_type_id=None, sample_id=None ):131 """132 Add a new template to an item - for library items, the template will ALWAYS BE SET TO INHERITABLE here. If you want to133 dis-inherit your template, call the manage_library_template_inheritance() below immediately after you call this134 method in your test code. Templates added to Requesttype objects are always inherited to samples.135 """136 params = dict( cntrller=cntrller, item_type=item_type, form_type=form_type, library_id=library_id )137 url = "/library_common/add_template"138 if item_type == 'folder':139 params[ 'folder_id' ] = folder_id140 elif item_type == 'ldda':141 params[ 'ldda_id' ] = ldda_id142 self.visit_url( url, params )143 self.check_page_for_string( "Select a template for the" )144 self.refresh_form( "form_id", form_id )145 # For some unknown reason, twill barfs if the form number ( 1 ) is used in the following146 # rather than the form anme ( select_template ), so we have to use the form name.147 tc.fv( "select_template", "inheritable", '1' )148 tc.submit( "add_template_button" )149 self.check_page_for_string = 'A template based on the form "%s" has been added to this' % form_name150 def add_user_address( self, user_id, address_dict ):151 self.visit_url( "%s/user/new_address?cntrller=user&user_id=%s" % ( self.url, user_id ) )152 self.check_page_for_string( 'Add new address' )153 for field_name, value in address_dict.items():154 tc.fv( "1", field_name, value )155 tc.submit( "new_address_button" )156 self.check_page_for_string( 'Address (%s) has been added' % address_dict[ 'short_desc' ] )157 def associate_users_and_groups_with_role( self, role_id, role_name, user_ids=[], group_ids=[] ):158 url = "%s/admin/role?id=%s&role_members_edit_button=Save" % ( self.url, role_id )159 if user_ids:160 url += "&in_users=%s" % ','.join( user_ids )161 if group_ids:162 url += "&in_groups=%s" % ','.join( group_ids )163 self.visit_url( url )164 check_str = "Role '%s' has been updated with %d associated users and %d associated groups" % ( role_name, len( user_ids ), len( group_ids ) )165 self.check_page_for_string( check_str )166 def associate_users_and_roles_with_group( self, group_id, group_name, user_ids=[], role_ids=[] ):167 url = "%s/admin/manage_users_and_roles_for_group?id=%s&group_roles_users_edit_button=Save" % ( self.url, group_id )168 if user_ids:169 url += "&in_users=%s" % ','.join( user_ids )170 if role_ids:171 url += "&in_roles=%s" % ','.join( role_ids )172 self.visit_url( url )173 check_str = "Group '%s' has been updated with %d associated roles and %d associated users" % ( group_name, len( role_ids ), len( user_ids ) )174 self.check_page_for_string( check_str )175 def auto_detect_metadata( self, hda_id ):176 """Auto-detect history_dataset_association metadata"""177 self.visit_url( "%s/datasets/%s/edit" % ( self.url, self.security.encode_id( hda_id ) ) )178 self.check_page_for_string( 'This will inspect the dataset and attempt' )179 tc.fv( 'auto_detect', 'detect', 'Auto-detect' )180 tc.submit( 'detect' )181 try:182 self.check_page_for_string( 'Attributes have been queued to be updated' )183 self.wait()184 except AssertionError:185 self.check_page_for_string( 'Attributes updated' )186 def browse_groups( self, strings_displayed=[] ):187 self.visit_url( '%s/admin/groups' % self.url )188 for check_str in strings_displayed:189 self.check_page_for_string( check_str )190 def browse_libraries_admin( self, deleted=False, strings_displayed=[], strings_not_displayed=[] ):191 self.visit_url( '%s/library_admin/browse_libraries?sort=name&f-description=All&f-name=All&f-deleted=%s' % ( self.url, str( deleted ) ) )192 for check_str in strings_displayed:193 self.check_page_for_string( check_str )194 for check_str in strings_not_displayed:195 try:196 self.check_page_for_string( check_str )197 raise AssertionError( "String (%s) incorrectly displayed when browsing library." % check_str )198 except:199 pass200 def browse_libraries_regular_user( self, strings_displayed=[], strings_not_displayed=[] ):201 self.visit_url( '%s/library/browse_libraries' % self.url )202 for check_str in strings_displayed:203 self.check_page_for_string( check_str )204 for check_str in strings_not_displayed:205 try:206 self.check_page_for_string( check_str )207 raise AssertionError( "String (%s) incorrectly displayed when browsing library." % check_str )208 except:209 pass210 def browse_library( self, cntrller, library_id, show_deleted=False, strings_displayed=[], strings_not_displayed=[] ):211 self.visit_url( '%s/library_common/browse_library?cntrller=%s&id=%s&show_deleted=%s' % ( self.url, cntrller, library_id, str( show_deleted ) ) )212 for check_str in strings_displayed:213 self.check_page_for_string( check_str )214 for check_str in strings_not_displayed:215 try:216 self.check_page_for_string( check_str )217 raise AssertionError( "String (%s) incorrectly displayed when browsing library." % check_str )218 except:219 pass220 def browse_roles( self, strings_displayed=[] ):221 self.visit_url( '%s/admin/roles' % self.url )222 for check_str in strings_displayed:223 self.check_page_for_string( check_str )224 def change_sample_state( self, request_id, sample_ids, new_sample_state_id, comment='', strings_displayed=[], strings_displayed_after_submit=[] ):225 url = "%s/requests_common/edit_samples?cntrller=requests_admin&id=%s" % ( self.url, request_id )226 self.visit_url( url )227 for check_str in strings_displayed:228 self.check_page_for_string( check_str )229 for sample_id in sample_ids:230 tc.fv( "1", "select_sample_%i" % sample_id, True )231 tc.fv( "1", "sample_operation", 'Change state' )232 # refresh on change to show the sample states selectfield233 self.refresh_form( "sample_operation", 'Change state' )234 self.check_page_for_string( "Change current state" )235 tc.fv( "1", "sample_state_id", new_sample_state_id )236 tc.fv( "1", "sample_event_comment", comment )237 tc.submit( "save_samples_button" )238 for check_str in strings_displayed_after_submit:239 self.check_page_for_string( check_str )240 def change_sample_target_data_library( self, cntrller, request_id, sample_ids, new_library_id, new_folder_id, folder_options=[], comment='', strings_displayed=[], strings_displayed_after_submit=[] ):241 url = "%s/requests_common/edit_samples?cntrller=%s&id=%s" % ( self.url, cntrller, request_id )242 self.visit_url( url )243 for check_str in strings_displayed:244 self.check_page_for_string( check_str )245 for sample_id in sample_ids:246 tc.fv( "edit_samples", "select_sample_%i" % sample_id, True )247 tc.fv( "edit_samples", "sample_operation", 'Select data library and folder' )248 # refresh on change to show the data libraries selectfield249 self.refresh_form( "sample_operation", 'Select data library and folder' )250 self.check_page_for_string( "Select data library:" )251 tc.fv( "1", "sample_operation_library_id", new_library_id )252 # refresh on change to show the selectfield with the list of253 # folders in the selected data library above254 self.refresh_form( "sample_operation_library_id", new_library_id )255 self.check_page_for_string( "Select folder:" )256 # check if the folder selectfield has been correctly populated257 for check_str in folder_options:258 self.check_page_for_string( check_str )259 tc.fv( "1", "sample_operation_folder_id", new_folder_id )260 tc.submit( "save_samples_button" )261 for check_str in strings_displayed_after_submit:262 self.check_page_for_string( check_str )263 def change_datatype( self, hda_id, datatype ):264 """Change format of history_dataset_association"""265 self.visit_url( "%s/datasets/%s/edit" % ( self.url, self.security.encode_id( hda_id ) ) )266 self.check_page_for_string( 'This will change the datatype of the existing dataset but' )267 tc.fv( 'change_datatype', 'datatype', datatype )268 tc.submit( 'change' )269 self.check_page_for_string( 'Changed the type of dataset' )270 def check_archive_contents( self, archive, lddas ):271 def get_ldda_path( ldda ):272 path = ""273 parent_folder = ldda.library_dataset.folder274 while parent_folder is not None:275 if parent_folder.parent is None:276 path = os.path.join( parent_folder.library_root[0].name, path )277 break278 path = os.path.join( parent_folder.name, path )279 parent_folder = parent_folder.parent280 path += ldda.name281 return path282 def mkdir( file ):283 dir = os.path.join( tmpd, os.path.dirname( file ) )284 if not os.path.exists( dir ):285 os.makedirs( dir )286 tmpd = tempfile.mkdtemp()287 if tarfile.is_tarfile( archive ):288 t = tarfile.open( archive )289 for n in t.getnames():290 mkdir( n )291 t.extract( n, tmpd )292 t.close()293 elif zipfile.is_zipfile( archive ):294 z = zipfile.ZipFile( archive, 'r' )295 for n in z.namelist():296 mkdir( n )297 open( os.path.join( tmpd, n ), 'wb' ).write( z.read( n ) )298 z.close()299 else:300 raise Exception( 'Unable to read archive: %s' % archive )301 for ldda in lddas:302 orig_file = self.get_filename( ldda.name )303 downloaded_file = os.path.join( tmpd, get_ldda_path( ldda ) )304 assert os.path.exists( downloaded_file )305 try:306 self.files_diff( orig_file, downloaded_file )307 except AssertionError, err:308 errmsg = 'Library item %s different than expected, difference:\n' % ldda.name309 errmsg += str( err )310 errmsg += 'Unpacked archive remains in: %s\n' % tmpd311 raise AssertionError( errmsg )312 shutil.rmtree( tmpd )313 def check_for_strings( self, strings_displayed=[], strings_not_displayed=[] ):314 if strings_displayed:315 for string in strings_displayed:316 self.check_page_for_string( string )317 if strings_not_displayed:318 for string in strings_not_displayed:319 self.check_string_not_in_page( string )320 def check_hda_attribute_info( self, hda_id, strings_displayed=[] ):321 """Edit history_dataset_association attribute information"""322 for check_str in strings_displayed:323 self.check_page_for_string( check_str )324 def check_hda_json_for_key_value( self, hda_id, key, value, use_string_contains=False ):325 """326 Uses the history API to determine whether the current history:327 (1) Has a history dataset with the required ID.328 (2) That dataset has the required key.329 (3) The contents of that key match the provided value.330 If use_string_contains=True, this will perform a substring match, otherwise an exact match.331 """332 # TODO: multi key, value333 hda = dict()334 for history_item in self.get_history_from_api():335 if history_item[ 'id' ] == hda_id:336 hda = self.json_from_url( history_item[ 'url' ] )337 break338 if hda:339 if key in hda:340 if use_string_contains:341 return value in hda[ key ]342 else:343 return value == hda[ key ]344 return False345 def check_history_for_errors( self ):346 """Raises an exception if there are errors in a history"""347 self.visit_url( "/history" )348 page = self.last_page()349 if page.find( 'error' ) > -1:350 raise AssertionError( 'Errors in the history for user %s' % self.user )351 def check_history_for_string( self, patt, show_deleted=False ):352 """Breaks patt on whitespace and searches for each element seperately in the history"""353 if show_deleted:354 params = dict( show_deleted=True )355 self.visit_url( "/history", params )356 else:357 self.visit_url( "/history" )358 for subpatt in patt.split():359 try:360 tc.find( subpatt )361 except:362 fname = self.write_temp_file( tc.browser.get_html() )363 errmsg = "no match to '%s'\npage content written to '%s'" % ( subpatt, fname )364 raise AssertionError( errmsg )365 def check_history_for_exact_string( self, string, show_deleted=False ):366 """Looks for exact match to 'string' in history page"""367 if show_deleted:368 self.visit_url( "/history?show_deleted=True" )369 else:370 self.visit_url( "/history" )371 try:372 tc.find( string )373 except:374 fname = self.write_temp_file( tc.browser.get_html() )375 errmsg = "no match to '%s'\npage content written to '%s'" % ( string, fname )376 raise AssertionError( errmsg )377 def check_history_json( self, check_fn, show_deleted=None ):378 """379 Tries to find a JSON string in the history page using the regex pattern,380 parse it, and assert check_fn returns True when called on that parsed381 data.382 """383 try:384 json_data = self.get_history_from_api( show_deleted=show_deleted, show_details=True )385 check_result = check_fn( json_data )386 assert check_result, 'failed check_fn: %s (got %s)' % ( check_fn.func_name, str( check_result ) )387 except Exception, e:388 log.exception( e )389 log.debug( 'json_data: %s', ( '\n' + pprint.pformat( json_data ) if json_data else '(no match)' ) )390 fname = self.write_temp_file( tc.browser.get_html() )391 errmsg = ( "json could not be read\npage content written to '%s'" % ( fname ) )392 raise AssertionError( errmsg )393 def check_metadata_for_string( self, patt, hid=None ):394 """Looks for 'patt' in the edit page when editing a dataset"""395 data_list = self.get_history_as_data_list()396 self.assertTrue( data_list )397 if hid is None: # take last hid398 elem = data_list[-1]399 hid = int( elem.get('hid') )400 self.assertTrue( hid )401 self.visit_url( "/dataset/edit?hid=%s" % hid )402 for subpatt in patt.split():403 tc.find(subpatt)404 def check_page(self, strings_displayed, strings_displayed_count, strings_not_displayed):405 """Checks a page for strings displayed, not displayed and number of occurrences of a string"""406 for check_str in strings_displayed:407 self.check_page_for_string( check_str )408 for check_str, count in strings_displayed_count:409 self.check_string_count_in_page( check_str, count )410 for check_str in strings_not_displayed:411 self.check_string_not_in_page( check_str )412 def check_page_for_string( self, patt ):413 """Looks for 'patt' in the current browser page"""414 page = self.last_page()415 if page.find( patt ) == -1:416 fname = self.write_temp_file( page )417 errmsg = "no match to '%s'\npage content written to '%s'\npage: [[%s]]" % ( patt, fname, page )418 raise AssertionError( errmsg )419 def check_request_grid( self, cntrller, state, deleted=False, strings_displayed=[] ):420 params = { 'f-state': state, 'f-deleted': deleted, 'sort': 'create_time' }421 self.visit_url( '/%s/browse_requests' % cntrller, params )422 for check_str in strings_displayed:423 self.check_page_for_string( check_str )424 def check_string_count_in_page( self, patt, min_count ):425 """Checks the number of 'patt' occurrences in the current browser page"""426 page = self.last_page()427 patt_count = page.count( patt )428 # The number of occurrences of patt in the page should be at least min_count429 # so show error if patt_count is less than min_count430 if patt_count < min_count:431 fname = self.write_temp_file( page )432 errmsg = "%i occurrences of '%s' found instead of %i.\npage content written to '%s' " % ( min_count, patt, patt_count, fname )433 raise AssertionError( errmsg )434 def check_string_not_in_page( self, patt ):435 """Checks to make sure 'patt' is NOT in the page."""436 page = self.last_page()437 if page.find( patt ) != -1:438 fname = self.write_temp_file( page )439 errmsg = "string (%s) incorrectly displayed in page.\npage content written to '%s'" % ( patt, fname )440 raise AssertionError( errmsg )441 def clear_cookies( self ):442 tc.clear_cookies()443 def clear_form( self, form=0 ):444 """Clears a form"""445 tc.formclear(str(form))446 def copy_history( self, history_id, copy_choice, strings_displayed=[], strings_displayed_after_submit=[] ):447 self.visit_url( "/history/copy?id=%s" % history_id )448 for check_str in strings_displayed:449 self.check_page_for_string( check_str )450 tc.fv( '1', 'copy_choice', copy_choice )451 tc.submit( 'copy_choice_button' )452 for check_str in strings_displayed_after_submit:453 self.check_page_for_string( check_str )454 def convert_format( self, hda_id, target_type ):455 """Convert format of history_dataset_association"""456 self.visit_url( "%s/datasets/%s/edit" % ( self.url, self.security.encode_id( hda_id ) ) )457 self.check_page_for_string( 'This will inspect the dataset and attempt' )458 tc.fv( 'convert_data', 'target_type', target_type )459 tc.submit( 'convert_data' )460 self.check_page_for_string( 'The file conversion of Convert BED to GFF on data' )461 self.wait() # wait for the format convert tool to finish before returning462 def copy_history_item( self, source_history_id=None, source_dataset_id=None, target_history_id=None, all_target_history_ids=[],463 deleted_history_ids=[] ):464 """465 Copy 1 history_dataset_association to 1 history (Limited by twill since it doesn't support multiple466 field names, such as checkboxes467 """468 self.visit_url( "/dataset/copy_datasets" )469 self.check_page_for_string( 'Source History:' )470 # Make sure all of users active histories are displayed471 for id in all_target_history_ids:472 self.check_page_for_string( id )473 # Make sure only active histories are displayed474 for id in deleted_history_ids:475 try:476 self.check_page_for_string( id )477 raise AssertionError( "deleted history id %d displayed in list of target histories" % id )478 except:479 pass480 form_values = [ ( 'source_history', source_history_id ),481 ( 'target_history_id', target_history_id ),482 ( 'source_content_ids', 'dataset|%s' % source_dataset_id ),483 ( 'do_copy', True ) ]484 self.visit_url( "/dataset/copy_datasets", params=form_values )485 check_str = '1 dataset copied to 1 history'486 self.check_page_for_string( check_str )487 # Functions associated with user accounts488 def create( self, cntrller='user', email='test@bx.psu.edu', password='testuser', username='admin-user', redirect='' ):489 # HACK: don't use panels because late_javascripts() messes up the twill browser and it490 # can't find form fields (and hence user can't be logged in).491 params = dict( cntrller=cntrller, use_panels=False )492 self.visit_url( "/user/create", params )493 tc.fv( 'registration', 'email', email )494 tc.fv( 'registration', 'redirect', redirect )495 tc.fv( 'registration', 'password', password )496 tc.fv( 'registration', 'confirm', password )497 tc.fv( 'registration', 'username', username )498 tc.submit( 'create_user_button' )499 previously_created = False500 username_taken = False501 invalid_username = False502 try:503 self.check_page_for_string( "Created new user account" )504 except:505 try:506 # May have created the account in a previous test run...507 self.check_page_for_string( "User with that email already exists" )508 previously_created = True509 except:510 try:511 self.check_page_for_string( 'Public name is taken; please choose another' )512 username_taken = True513 except:514 try:515 # Note that we're only checking if the usr name is >< 4 chars here...516 self.check_page_for_string( 'Public name must be at least 4 characters in length' )517 invalid_username = True518 except:519 pass520 return previously_created, username_taken, invalid_username521 def create_library( self, name='Library One', description='This is Library One', synopsis='Synopsis for Library One' ):522 """Create a new library"""523 self.visit_url( "%s/library_admin/create_library" % self.url )524 self.check_page_for_string( 'Create a new data library' )525 tc.fv( "1", "name", name )526 tc.fv( "1", "description", description )527 tc.fv( "1", "synopsis", synopsis )528 tc.submit( "create_library_button" )529 check_str = escape( "The new library named '%s' has been created" % name )530 self.check_page_for_string( check_str )531 def create_user_with_info( self, email, password, username, user_info_values, user_type_fd_id='', cntrller='user',532 strings_displayed=[], strings_displayed_after_submit=[] ):533 # This method creates a new user with associated info534 self.visit_url( "%s/user/create?cntrller=%s&use_panels=False" % ( self.url, cntrller ) )535 tc.fv( "registration", "email", email )536 tc.fv( "registration", "password", password )537 tc.fv( "registration", "confirm", password )538 tc.fv( "registration", "username", username )539 if user_type_fd_id:540 # The user_type_fd_id SelectField requires a refresh_on_change541 self.refresh_form( 'user_type_fd_id', user_type_fd_id, form_id='registration' )542 tc.fv( "registration", "password", password )543 tc.fv( "registration", "confirm", password )544 for index, ( field_name, info_value ) in enumerate( user_info_values ):545 tc.fv( "registration", field_name, info_value )546 for check_str in strings_displayed:547 self.check_page_for_string( check_str)548 tc.submit( "create_user_button" )549 def create_group( self, name='Group One', in_user_ids=[], in_role_ids=[], create_role_for_group=False, strings_displayed=[] ):550 """Create a new group"""551 url = "/admin/groups"552 params = dict( operation='create', create_group_button='Save', name=name )553 if in_user_ids:554 params[ 'in_users' ] = ','.join( in_user_ids )555 if in_role_ids:556 params[ 'in_roles' ] = ','.join( in_role_ids )557 if create_role_for_group:558 params[ 'create_role_for_group' ] = [ 'yes', 'yes' ]559 doseq = True560 else:561 params[ 'create_role_for_group' ] = 'no'562 doseq = False563 self.visit_url( url, params=params, doseq=doseq )564 for check_str in strings_displayed:565 self.check_page_for_string( check_str )566 self.visit_url( "/admin/groups" )567 self.check_page_for_string( name )568 def create_form( self, name, description, form_type, field_type='TextField', form_layout_name='',569 num_fields=1, num_options=0, field_name='1_field_name', strings_displayed=[],570 strings_displayed_after_submit=[] ):571 """Create a new form definition."""572 self.visit_url( "%s/forms/create_form_definition" % self.url )573 for check_str in strings_displayed:574 self.check_page_for_string( check_str )575 tc.fv( "create_form_definition", "name", name )576 tc.fv( "create_form_definition", "description", description )577 tc.fv( "create_form_definition", "form_type_select_field", form_type )578 tc.submit( "create_form_button" )579 if form_type == "Sequencing Sample Form":580 tc.submit( "add_layout_grid" )581 tc.fv( "edit_form_definition", "grid_layout0", form_layout_name )582 # if not adding any fields at this time, remove the default empty field583 if num_fields == 0:584 tc.submit( "remove_button" )585 # Add fields to the new form definition586 for index1 in range( num_fields ):587 field_label = 'field_label_%i' % index1588 field_contents = field_type589 field_help_name = 'field_helptext_%i' % index1590 field_help_contents = 'Field %i help' % index1591 field_default = 'field_default_0'592 field_default_contents = '%s default contents' % form_type593 tc.fv( "edit_form_definition", field_label, field_contents )594 tc.fv( "edit_form_definition", field_help_name, field_help_contents )595 if field_type == 'SelectField':596 # SelectField field_type requires a refresh_on_change597 self.refresh_form( 'field_type_0', field_type )598 # Add options so our select list is functional599 if num_options == 0:600 # Default to 2 options601 num_options = 2602 for index2 in range( 1, num_options + 1 ):603 tc.submit( "addoption_0" )604 # Add contents to the new options fields605 for index2 in range( num_options ):606 option_field_name = 'field_0_option_%i' % index2607 option_field_value = 'Option%i' % index2608 tc.fv( "edit_form_definition", option_field_name, option_field_value )609 else:610 tc.fv( "edit_form_definition", "field_type_0", field_type )611 tc.fv( "edit_form_definition", 'field_name_0', field_name )612 tc.fv( "edit_form_definition", field_default, field_default_contents )613 # All done... now save614 tc.submit( "save_changes_button" )615 for check_str in strings_displayed_after_submit:616 self.check_page_for_string( check_str )617 def create_external_service( self, name, description, version, external_service_type_id, field_values={}, strings_displayed=[], strings_displayed_after_submit=[] ):618 self.visit_url( '%s/external_service/create_external_service' % self.url )619 for check_str in strings_displayed:620 self.check_page_for_string( check_str )621 tc.fv( "1", "name", name )622 tc.fv( "1", "description", description )623 tc.fv( "1", "version", version )624 self.refresh_form( "external_service_type_id", external_service_type_id )625 for field, value in field_values.items():626 tc.fv( "1", field, value )627 tc.submit( "create_external_service_button" )628 for check_str in strings_displayed_after_submit:629 self.check_page_for_string( check_str )630 def create_new_account_as_admin( self, email='test4@bx.psu.edu', password='testuser',631 username='regular-user4', redirect='' ):632 """Create a new account for another user"""633 # HACK: don't use panels because late_javascripts() messes up the twill browser and it634 # can't find form fields (and hence user can't be logged in).635 self.visit_url( "%s/user/create?cntrller=admin" % self.url )636 self.submit_form( 1, 'create_user_button', email=email, redirect=redirect, password=password, confirm=password, username=username )637 previously_created = False638 username_taken = False639 invalid_username = False640 try:641 self.check_page_for_string( "Created new user account" )642 except:643 try:644 # May have created the account in a previous test run...645 self.check_page_for_string( "User with that email already exists" )646 previously_created = True647 except:648 try:649 self.check_page_for_string( 'Public name is taken; please choose another' )650 username_taken = True651 except:652 try:653 # Note that we're only checking if the usr name is >< 4 chars here...654 self.check_page_for_string( 'Public name must be at least 4 characters in length' )655 invalid_username = True656 except:657 pass658 return previously_created, username_taken, invalid_username659 def create_request_type( self, name, desc, request_form_id, sample_form_id, states, strings_displayed=[], strings_displayed_after_submit=[] ):660 self.visit_url( "%s/request_type/create_request_type" % self.url )661 for check_str in strings_displayed:662 self.check_page_for_string( check_str )663 tc.fv( "1", "name", name )664 tc.fv( "1", "desc", desc )665 tc.fv( "1", "request_form_id", request_form_id )666 tc.fv( "1", "sample_form_id", sample_form_id )667 for index, state in enumerate(states):668 tc.fv("1", "state_name_%i" % index, state[0])669 tc.fv("1", "state_desc_%i" % index, state[1])670 tc.submit( "add_state_button" )671 tc.submit( "create_request_type_button" )672 for check_str in strings_displayed_after_submit:673 self.check_page_for_string( check_str )674 def create_request( self, cntrller, request_type_id, name, desc, field_value_tuples, other_users_id='',675 strings_displayed=[], strings_displayed_after_submit=[] ):676 self.visit_url( "%s/requests_common/create_request?cntrller=%s" % ( self.url, cntrller ) )677 # The request_type SelectList requires a refresh_on_change678 self.refresh_form( 'request_type_id', request_type_id )679 if cntrller == 'requests_admin' and other_users_id:680 # The admin is creating a request on behalf of another user681 # The user_id SelectField requires a refresh_on_change so that the selected682 # user's addresses will be populated in the AddressField widget683 self.refresh_form( "user_id", other_users_id )684 for check_str in strings_displayed:685 self.check_page_for_string( check_str )686 tc.fv( "1", "name", name )687 tc.fv( "1", "desc", desc )688 for index, field_value_tuple in enumerate( field_value_tuples ):689 field_name, field_value, refresh_on_change = field_value_tuple690 if refresh_on_change:691 # Only the AddressField type has a refresh on change setup on selecting an option692 address_option = field_value[0]693 address_value = field_value[1]694 self.refresh_form( field_name, address_option )695 if address_option == 'new':696 # handle new address697 self.check_page_for_string( 'Short address description' )698 for address_field, value in address_value.items():699 tc.fv( "1", field_name + '_' + address_field, value )700 else:701 # existing address702 tc.fv( "1", field_name, address_value )703 else:704 tc.fv( "1", field_name, field_value )705 tc.submit( "create_request_button" )706 for check_str in strings_displayed_after_submit:707 self.check_page_for_string( check_str )708 def create_role( self,709 name='Role One',710 description="This is Role One",711 in_user_ids=[],712 in_group_ids=[],713 create_group_for_role='',714 private_role='',715 strings_displayed=[] ):716 """Create a new role"""717 url = "/admin/roles"718 url_params = dict( operation='create', create_role_button='Save', name=name, description=description )719 if in_user_ids:720 url_params[ 'in_users' ] = ','.join( in_user_ids )721 if in_group_ids:722 url_params[ 'in_groups' ] = ','.join( in_group_ids )723 if create_group_for_role == 'yes':724 url_params[ 'create_group_for_role' ] = [ 'yes', 'yes' ]725 doseq = True726 else:727 doseq = False728 self.visit_url( url, params=url_params, doseq=doseq )729 for check_str in strings_displayed:730 self.check_page_for_string( check_str )731 if private_role:732 # Make sure no private roles are displayed733 try:734 self.check_page_for_string( private_role )735 errmsg = 'Private role %s displayed on Roles page' % private_role736 raise AssertionError( errmsg )737 except AssertionError:738 # Reaching here is the behavior we want since no private roles should be displayed739 pass740 self.visit_url( "%s/admin/roles" % self.url )741 self.check_page_for_string( name )742 def delete_current_history( self, strings_displayed=[] ):743 """Deletes the current history"""744 self.visit_url( "/history/delete_current" )745 for check_str in strings_displayed:746 self.check_page_for_string( check_str )747 def delete_history( self, id ):748 """Deletes one or more histories"""749 history_ids = self.get_all_history_ids_from_api()750 self.assertTrue( history_ids )751 num_deleted = len( id.split( ',' ) )752 self.visit_url( "/history/list?operation=delete&id=%s" % ( id ) )753 check_str = 'Deleted %d %s' % ( num_deleted, iff( num_deleted != 1, "histories", "history" ) )754 self.check_page_for_string( check_str )755 def delete_history_item( self, hda_id, strings_displayed=[] ):756 """Deletes an item from a history"""757 try:758 hda_id = int( hda_id )759 except:760 raise AssertionError( "Invalid hda_id '%s' - must be int" % hda_id )761 self.visit_url( "%s/datasets/%s/delete?show_deleted_on_refresh=False" % ( self.url, self.security.encode_id( hda_id ) ) )762 for check_str in strings_displayed:763 self.check_page_for_string( check_str )764 def delete_library_item( self, cntrller, library_id, item_id, item_name, item_type='library_dataset' ):765 """Mark a library item as deleted"""766 params = dict( cntrller=cntrller, library_id=library_id, item_id=item_id, item_type=item_type )767 self.visit_url( "/library_common/delete_library_item", params )768 if item_type == 'library_dataset':769 item_desc = 'Dataset'770 else:771 item_desc = item_type.capitalize()772 check_str = "marked deleted"773 self.check_for_strings( strings_displayed=[ item_desc, check_str ] )774 def delete_sample_datasets( self, sample_id, sample_dataset_ids, strings_displayed=[], strings_displayed_after_submit=[], strings_not_displayed=[] ):775 url = '%s/requests_admin/manage_datasets?cntrller=requests_admin&sample_id=%s' % ( self.url, sample_id )776 self.visit_url( url )777 for check_str in strings_displayed:778 self.check_page_for_string( check_str )779 # simulate selecting datasets and clicking the delete button on the sample datasets grid780 sample_dataset_ids_string = ','.join( sample_dataset_ids )781 params = dict( operation='delete', id=sample_dataset_ids_string )782 url = "/requests_admin/manage_datasets"783 self.visit_url( url, params )784 for check_str in strings_displayed_after_submit:785 self.check_page_for_string( check_str )786 for check_str in strings_not_displayed:787 self.check_string_not_in_page( check_str )788 def disable_access_via_link( self, history_id, strings_displayed=[], strings_displayed_after_submit=[] ):789 # twill barfs on this form, possibly because it contains no fields, but not sure.790 # In any case, we have to mimic the form submission791 self.visit_url( '/history/sharing', dict( id=history_id, disable_link_access=True ) )792 self.check_for_strings( strings_displayed=strings_displayed_after_submit )793 def display_history_item( self, hda_id, strings_displayed=[] ):794 """Displays a history item - simulates eye icon click"""795 self.visit_url( '%s/datasets/%s/display/' % ( self.url, self.security.encode_id( hda_id ) ) )796 for check_str in strings_displayed:797 self.check_page_for_string( check_str )798 def download_archive_of_library_files( self, cntrller, library_id, ldda_ids, format ):799 # Here it would be ideal to have twill set form values and submit the form, but800 # twill barfs on that due to the recently introduced page wrappers around the contents801 # of the browse_library.mako template which enable panel layout when visiting the802 # page from an external URL. By "barfs", I mean that twill somehow loses hod on the803 # cntrller param. We'll just simulate the form submission by building the URL manually.804 # Here's the old, better approach...805 # self.visit_url( "%s/library_common/browse_library?cntrller=%s&id=%s" % ( self.url, cntrller, library_id ) )806 # for ldda_id in ldda_ids:807 # tc.fv( "1", "ldda_ids", ldda_id )808 # tc.fv( "1", "do_action", format )809 # tc.submit( "action_on_datasets_button" )810 # Here's the new approach...811 params = dict( cntrller=cntrller, library_id=library_id, do_action=format, ldda_ids=ldda_ids )812 url = "/library_common/act_on_multiple_datasets"813 self.visit_url( url, params, doseq=True )814 tc.code( 200 )815 archive = self.write_temp_file( self.last_page(), suffix='.' + format )816 return archive817 def edit_basic_request_info( self, cntrller, request_id, name, new_name='', new_desc='', new_fields=[],818 strings_displayed=[], strings_displayed_after_submit=[] ):819 params = dict( cntrller=cntrller, id=request_id )820 self.visit_url( "/requests_common/edit_basic_request_info", params )821 for check_str in strings_displayed:822 self.check_page_for_string( check_str )823 if new_name:824 tc.fv( "1", "name", new_name )825 if new_desc:826 tc.fv( "1", "desc", new_desc )827 for index, ( field_name, field_value ) in enumerate( new_fields ):828 tc.fv( "1", field_name, field_value )829 tc.submit( "edit_basic_request_info_button" )830 for check_str in strings_displayed_after_submit:831 self.check_page_for_string( check_str )832 def edit_external_service( self, external_service_id, field_values={}, strings_displayed=[], strings_displayed_after_submit=[] ):833 self.visit_url( '%s/external_service/edit_external_service?id=%s' % ( self.url, external_service_id ) )834 for check_str in strings_displayed:835 self.check_page_for_string( check_str )836 for field, value in field_values.items():837 tc.fv( "1", field, value )838 tc.submit( "edit_external_service_button" )839 for check_str in strings_displayed_after_submit:840 self.check_page_for_string( check_str )841 def edit_form( self, id, form_type='', new_form_name='', new_form_desc='', field_dicts=[], field_index=0,842 strings_displayed=[], strings_not_displayed=[], strings_displayed_after_submit=[] ):843 """Edit form details; name and description"""844 self.visit_url( "/forms/edit_form_definition", params=dict( id=id ) )845 for check_str in strings_displayed:846 self.check_page_for_string( check_str )847 if new_form_name:848 tc.fv( "edit_form_definition", "name", new_form_name )849 if new_form_desc:850 tc.fv( "edit_form_definition", "description", new_form_desc )851 for i, field_dict in enumerate( field_dicts ):852 index = i + field_index853 tc.submit( "add_field_button" )854 field_label = "field_label_%i" % index855 field_label_value = field_dict[ 'label' ]856 field_help = "field_helptext_%i" % index857 field_help_value = field_dict[ 'desc' ]858 field_type = "field_type_%i" % index859 field_type_value = field_dict[ 'type' ]860 field_required = "field_required_%i" % index861 field_required_value = field_dict[ 'required' ]862 field_name = "field_name_%i" % index863 field_name_value = field_dict.get( 'name', '%i_field_name' % index )864 tc.fv( "edit_form_definition", field_label, field_label_value )865 tc.fv( "edit_form_definition", field_help, field_help_value )866 tc.fv( "edit_form_definition", field_required, field_required_value )867 tc.fv( "edit_form_definition", field_name, field_name_value )868 if field_type_value.lower() == 'selectfield':869 # SelectFields require a refresh_on_change870 self.refresh_form( field_type, field_type_value )871 for option_index, option in enumerate( field_dict[ 'selectlist' ] ):872 tc.submit( "addoption_%i" % index )873 tc.fv( "edit_form_definition", "field_%i_option_%i" % ( index, option_index ), option )874 else:875 tc.fv( "edit_form_definition", field_type, field_type_value )876 tc.submit( "save_changes_button" )877 for check_str in strings_displayed_after_submit:878 self.check_page_for_string( check_str )879 def edit_hda_attribute_info( self, hda_id, new_name='', new_info='', new_dbkey='', new_startcol='',880 strings_displayed=[], strings_not_displayed=[] ):881 """Edit history_dataset_association attribute information"""882 self.visit_url( "/datasets/%s/edit" % self.security.encode_id( hda_id ) )883 submit_required = False884 self.check_page_for_string( 'Edit Attributes' )885 if new_name:886 tc.fv( 'edit_attributes', 'name', new_name )887 submit_required = True888 if new_info:889 tc.fv( 'edit_attributes', 'info', new_info )890 submit_required = True891 if new_dbkey:892 tc.fv( 'edit_attributes', 'dbkey', new_dbkey )893 submit_required = True894 if new_startcol:895 tc.fv( 'edit_attributes', 'startCol', new_startcol )896 submit_required = True897 if submit_required:898 tc.submit( 'save' )899 self.check_page_for_string( 'Attributes updated' )900 for check_str in strings_displayed:901 self.check_page_for_string( check_str )902 for check_str in strings_not_displayed:903 try:904 self.check_page_for_string( check_str )905 raise AssertionError( "String (%s) incorrectly displayed on Edit Attributes page." % check_str )906 except:907 pass908 def edit_request_email_settings( self, cntrller, request_id, check_request_owner=True, additional_emails='',909 check_sample_states=[], strings_displayed=[], strings_displayed_after_submit=[] ):910 self.visit_url( "/requests_common/edit_basic_request_info", params=dict( cntrller=cntrller, id=request_id ) )911 for check_str in strings_displayed:912 self.check_page_for_string( check_str )913 tc.fv( "2", "email_address", check_request_owner )914 tc.fv( "2", "additional_email_addresses", additional_emails )915 for state_name, state_id, is_checked in check_sample_states:916 tc.fv( "2", "sample_state_%i" % state_id, is_checked )917 tc.submit( "edit_email_settings_button" )918 for check_str in strings_displayed_after_submit:919 self.check_page_for_string( check_str )920 def edit_samples( self, cntrller, request_id, sample_value_tuples, strings_displayed=[], strings_displayed_after_submit=[] ):921 params = dict( cntrller=cntrller, id=request_id )922 url = "/requests_common/edit_samples"923 self.visit_url( url, params=params )924 for check_str in strings_displayed:925 self.check_page_for_string( check_str )926 for sample_index, ( sample_name, target_library_info, sample_field_values ) in enumerate( sample_value_tuples ):927 tc.fv( "1", "sample_%i_name" % sample_index, sample_name )928 tc.fv( "1", "sample_%i_library_id" % sample_index, target_library_info[ 'library' ] )929 self.refresh_form( "sample_%i_library_id" % sample_index, target_library_info[ 'library' ] )930 tc.fv( "1", "sample_%i_folder_id" % sample_index, target_library_info[ 'folder' ] )931 for field_index, field_value in enumerate( sample_field_values ):932 tc.fv( "1", "sample_%i_field_%i" % ( sample_index, field_index ), field_value )933 tc.submit( "save_samples_button" )934 for check_str in strings_displayed_after_submit:935 self.check_page_for_string( check_str )936 def edit_template( self, cntrller, item_type, form_type, library_id, field_type, field_label_1, field_helptext_1, field_default_1,937 folder_id='', ldda_id='', action='add_field' ):938 """Edit the form fields defining a library template"""939 params = dict( cntrller=cntrller, item_type=item_type, form_type=form_type, library_id=library_id )940 self.visit_url( "/library_common/edit_template", params=params )941 self.check_page_for_string( "Edit form definition" )942 if action == 'add_field':943 tc.submit( "add_field_button" )944 tc.fv( "edit_form", "field_label_1", field_label_1 )945 tc.fv( "edit_form", "field_helptext_1", field_helptext_1 )946 if field_type == 'SelectField':947 # Performs a refresh_on_change in this case948 self.refresh_form( "field_type_1", field_type )949 else:950 tc.fv( "edit_form", "field_type_1", field_type )951 tc.fv( "edit_form", "field_default_1", field_default_1 )952 tc.submit( 'save_changes_button' )953 self.check_page_for_string( "The template for this data library has been updated with your changes." )954 def edit_user_info( self, cntrller='user', id='', new_email='', new_username='', password='', new_password='',955 info_values=[], strings_displayed=[], strings_displayed_after_submit=[] ):956 if cntrller == 'admin':957 url = "%s/admin/users?id=%s&operation=information" % ( self.url, id )958 else: # cntrller == 'user:959 # The user is editing his own info, so the user id is gotten from trans.user.960 url = "%s/user/manage_user_info?cntrller=user" % self.url961 self.visit_url( url )962 for check_str in strings_displayed:963 self.check_page_for_string( check_str )964 if new_email or new_username:965 if new_email:966 tc.fv( "login_info", "email", new_email )967 if new_username:968 tc.fv( "login_info", "username", new_username )969 tc.submit( "login_info_button" )970 if password and new_password:971 tc.fv( "change_password", "current", password )972 tc.fv( "change_password", "password", new_password )973 tc.fv( "change_password", "confirm", new_password )974 tc.submit( "change_password_button" )975 if info_values:976 for index, ( field_name, info_value ) in enumerate( info_values ):977 tc.fv( "user_info", field_name, info_value )978 tc.submit( "edit_user_info_button" )979 for check_str in strings_displayed_after_submit:980 self.check_page_for_string( check_str )981 def files_diff( self, file1, file2, attributes=None ):982 """Checks the contents of 2 files for differences"""983 def get_lines_diff( diff ):984 count = 0985 for line in diff:986 if ( line.startswith( '+' ) and not line.startswith( '+++' ) ) or ( line.startswith( '-' ) and not line.startswith( '---' ) ):987 count += 1988 return count989 if not filecmp.cmp( file1, file2 ):990 files_differ = False991 local_file = open( file1, 'U' ).readlines()992 history_data = open( file2, 'U' ).readlines()993 if attributes is None:994 attributes = {}995 if attributes.get( 'sort', False ):996 history_data.sort()997 # Why even bother with the check loop below, why not just use the diff output? This seems wasteful.998 if len( local_file ) == len( history_data ):999 for i in range( len( history_data ) ):1000 if local_file[i].rstrip( '\r\n' ) != history_data[i].rstrip( '\r\n' ):1001 files_differ = True1002 break1003 else:1004 files_differ = True1005 if files_differ:1006 allowed_diff_count = int(attributes.get( 'lines_diff', 0 ))1007 diff = list( difflib.unified_diff( local_file, history_data, "local_file", "history_data" ) )1008 diff_lines = get_lines_diff( diff )1009 if diff_lines > allowed_diff_count:1010 if 'GALAXY_TEST_RAW_DIFF' in os.environ:1011 diff_slice = diff1012 else:1013 if len(diff) < 60:1014 diff_slice = diff[0:40]1015 else:1016 diff_slice = diff[:25] + ["********\n", "*SNIP *\n", "********\n"] + diff[-25:]1017 # FIXME: This pdf stuff is rather special cased and has not been updated to consider lines_diff1018 # due to unknown desired behavior when used in conjunction with a non-zero lines_diff1019 # PDF forgiveness can probably be handled better by not special casing by __extension__ here1020 # and instead using lines_diff or a regular expression matching1021 # or by creating and using a specialized pdf comparison function1022 if file1.endswith( '.pdf' ) or file2.endswith( '.pdf' ):1023 # PDF files contain creation dates, modification dates, ids and descriptions that change with each1024 # new file, so we need to handle these differences. As long as the rest of the PDF file does1025 # not differ we're ok.1026 valid_diff_strs = [ 'description', 'createdate', 'creationdate', 'moddate', 'id', 'producer', 'creator' ]1027 valid_diff = False1028 invalid_diff_lines = 01029 for line in diff_slice:1030 # Make sure to lower case strings before checking.1031 line = line.lower()1032 # Diff lines will always start with a + or - character, but handle special cases: '--- local_file \n', '+++ history_data \n'1033 if ( line.startswith( '+' ) or line.startswith( '-' ) ) and line.find( 'local_file' ) < 0 and line.find( 'history_data' ) < 0:1034 for vdf in valid_diff_strs:1035 if line.find( vdf ) < 0:1036 valid_diff = False1037 else:1038 valid_diff = True1039 # Stop checking as soon as we know we have a valid difference1040 break1041 if not valid_diff:1042 invalid_diff_lines += 11043 log.info('## files diff on %s and %s lines_diff=%d, found diff = %d, found pdf invalid diff = %d' % (file1, file2, allowed_diff_count, diff_lines, invalid_diff_lines))1044 if invalid_diff_lines > allowed_diff_count:1045 # Print out diff_slice so we can see what failed1046 print "###### diff_slice ######"1047 raise AssertionError( "".join( diff_slice ) )1048 else:1049 log.info('## files diff on %s and %s lines_diff=%d, found diff = %d' % (file1, file2, allowed_diff_count, diff_lines))1050 for line in diff_slice:1051 for char in line:1052 if ord( char ) > 128:1053 raise AssertionError( "Binary data detected, not displaying diff" )1054 raise AssertionError( "".join( diff_slice ) )1055 def files_re_match( self, file1, file2, attributes=None ):1056 """Checks the contents of 2 files for differences using re.match"""1057 local_file = open( file1, 'U' ).readlines() # regex file1058 history_data = open( file2, 'U' ).readlines()1059 assert len( local_file ) == len( history_data ), 'Data File and Regular Expression File contain a different number of lines (%s != %s)\nHistory Data (first 40 lines):\n%s' % ( len( local_file ), len( history_data ), ''.join( history_data[:40] ) )1060 if attributes is None:1061 attributes = {}1062 if attributes.get( 'sort', False ):1063 history_data.sort()1064 lines_diff = int(attributes.get( 'lines_diff', 0 ))1065 line_diff_count = 01066 diffs = []1067 for i in range( len( history_data ) ):1068 if not re.match( local_file[i].rstrip( '\r\n' ), history_data[i].rstrip( '\r\n' ) ):1069 line_diff_count += 11070 diffs.append( 'Regular Expression: %s\nData file : %s' % ( local_file[i].rstrip( '\r\n' ), history_data[i].rstrip( '\r\n' ) ) )1071 if line_diff_count > lines_diff:1072 raise AssertionError( "Regular expression did not match data file (allowed variants=%i):\n%s" % ( lines_diff, "".join( diffs ) ) )1073 def files_re_match_multiline( self, file1, file2, attributes=None ):1074 """Checks the contents of 2 files for differences using re.match in multiline mode"""1075 local_file = open( file1, 'U' ).read() # regex file1076 if attributes is None:1077 attributes = {}1078 if attributes.get( 'sort', False ):1079 history_data = open( file2, 'U' ).readlines()1080 history_data.sort()1081 history_data = ''.join( history_data )1082 else:1083 history_data = open( file2, 'U' ).read()1084 # lines_diff not applicable to multiline matching1085 assert re.match( local_file, history_data, re.MULTILINE ), "Multiline Regular expression did not match data file"1086 def files_contains( self, file1, file2, attributes=None ):1087 """Checks the contents of file2 for substrings found in file1, on a per-line basis"""1088 local_file = open( file1, 'U' ).readlines() # regex file1089 # TODO: allow forcing ordering of contains1090 history_data = open( file2, 'U' ).read()1091 lines_diff = int( attributes.get( 'lines_diff', 0 ) )1092 line_diff_count = 01093 while local_file:1094 contains = local_file.pop( 0 ).rstrip( '\n\r' )1095 if contains not in history_data:1096 line_diff_count += 11097 if line_diff_count > lines_diff:1098 raise AssertionError( "Failed to find '%s' in history data. (lines_diff=%i):\n" % ( contains, lines_diff ) )1099 def find_hda_by_dataset_name( self, name, history=None ):1100 if history is None:1101 history = self.get_history_from_api()1102 for hda in history:1103 if hda[ 'name' ] == name:1104 return hda1105 def folder_info( self, cntrller, folder_id, library_id, name='', new_name='', description='', template_refresh_field_name='1_field_name',1106 template_refresh_field_contents='', template_fields=[], strings_displayed=[], strings_not_displayed=[],1107 strings_displayed_after_submit=[], strings_not_displayed_after_submit=[] ):1108 """Add information to a library using an existing template with 2 elements"""1109 self.visit_url( "%s/library_common/folder_info?cntrller=%s&id=%s&library_id=%s" %1110 ( self.url, cntrller, folder_id, library_id ) )1111 if name and new_name and description:1112 tc.fv( '1', "name", new_name )1113 tc.fv( '1', "description", description )1114 tc.submit( 'rename_folder_button' )1115 for check_str in strings_displayed:1116 self.check_page_for_string( check_str )1117 for check_str in strings_not_displayed:1118 try:1119 self.check_page_for_string( check_str )1120 raise AssertionError( "String (%s) incorrectly displayed." % check_str )1121 except:1122 pass1123 if template_refresh_field_contents:1124 # A template containing an AddressField is displayed on the form, so we need to refresh the form1125 # with the received template_refresh_field_contents. There are 2 forms on the folder_info page1126 # when in edit mode, and the 2nd one is the one we want.1127 self.refresh_form( template_refresh_field_name, template_refresh_field_contents, form_no=2 )1128 if template_fields:1129 # We have an information template associated with the folder, so1130 # there are 2 forms on this page and the template is the 2nd form1131 for field_name, field_value in template_fields:1132 tc.fv( "edit_info", field_name, field_value )1133 tc.submit( 'edit_info_button' )1134 for check_str in strings_displayed_after_submit:1135 self.check_page_for_string( check_str )1136 for check_str in strings_not_displayed_after_submit:1137 try:1138 self.check_page_for_string( check_str )1139 raise AssertionError( "String (%s) incorrectly displayed." % check_str )1140 except:1141 pass1142 def get_all_history_ids_from_api( self ):1143 return [ history['id'] for history in self.json_from_url( '/api/histories' ) ]1144 def get_filename( self, filename, shed_tool_id=None ):1145 if shed_tool_id and self.shed_tools_dict:1146 file_dir = self.shed_tools_dict[ shed_tool_id ]1147 if file_dir:1148 return os.path.abspath( os.path.join( file_dir, filename))1149 return self.test_data_resolver.get_filename( filename )1150 def get_form_controls( self, form ):1151 formcontrols = []1152 for i, control in enumerate( form.controls ):1153 formcontrols.append( "control %d: %s" % ( i, str( control ) ) )1154 return formcontrols1155 def get_hids_in_history( self, history_id ):1156 """Returns the list of hid values for items in a history"""1157 hids = []1158 api_url = '/api/histories/%s/contents' % history_id1159 hids = [ history_item[ 'hid' ] for history_item in self.json_from_url( api_url ) ]1160 return hids1161 def get_hids_in_histories( self ):1162 """Returns the list of hids values for items in all histories"""1163 history_ids = self.get_all_history_ids_from_api()1164 hids = []1165 for history_id in history_ids:1166 hids.extend( self.get_hids_in_history( history_id ) )1167 return hids1168 def get_history_as_data_list( self, show_deleted=False ):1169 """Returns the data elements of a history"""1170 tree = self.history_as_xml_tree( show_deleted=show_deleted )1171 data_list = [ elem for elem in tree.findall("data") ]1172 return data_list1173 def get_history_from_api( self, encoded_history_id=None, show_deleted=None, show_details=False ):1174 if encoded_history_id is None:1175 history = self.get_latest_history()1176 encoded_history_id = history[ 'id' ]1177 params = dict()1178 if show_deleted is not None:1179 params[ 'deleted' ] = show_deleted1180 api_url = '/api/histories/%s/contents' % encoded_history_id1181 json_data = self.json_from_url( api_url, params=params )1182 if show_deleted is not None:1183 hdas = []1184 for hda in json_data:1185 if show_deleted:1186 hdas.append( hda )1187 else:1188 if not hda[ 'deleted' ]:1189 hdas.append( hda )1190 json_data = hdas1191 if show_details:1192 params[ 'details' ] = ','.join( [ hda[ 'id' ] for hda in json_data ] )1193 api_url = '/api/histories/%s/contents' % encoded_history_id1194 json_data = self.json_from_url( api_url, params=params )1195 return json_data1196 def get_job_stdout( self, hda_id, format=False ):1197 return self._get_job_stream_output( hda_id, 'stdout', format )1198 def get_job_stderr( self, hda_id, format=False ):1199 return self._get_job_stream_output( hda_id, 'stderr', format )1200 def get_latest_history( self ):1201 return self.json_from_url( '/api/histories' )[ 0 ]1202 def get_running_datasets( self ):1203 self.visit_url( '/api/histories' )1204 history_id = loads( self.last_page() )[0][ 'id' ]1205 self.visit_url( '/api/histories/%s' % history_id )1206 jsondata = loads( self.last_page() )1207 return jsondata[ 'state' ] in [ 'queued', 'running' ]1208 def get_tags( self, item_id, item_class ):1209 self.visit_url( "%s/tag/get_tagging_elt_async?item_id=%s&item_class=%s" %1210 ( self.url, item_id, item_class ) )1211 def history_as_xml_tree( self, show_deleted=False ):1212 """Returns a parsed xml object of a history"""1213 self.visit_url( '/history?as_xml=True&show_deleted=%s' % show_deleted )1214 xml = self.last_page()1215 tree = ElementTree.fromstring(xml)1216 return tree1217 def history_set_default_permissions( self, permissions_out=[], permissions_in=[], role_id=3 ): # role.id = 3 is Private Role for test3@bx.psu.edu1218 # NOTE: Twill has a bug that requires the ~/user/permissions page to contain at least 1 option value1219 # in each select list or twill throws an exception, which is: ParseError: OPTION outside of SELECT1220 # Due to this bug, we'll bypass visiting the page, and simply pass the permissions on to the1221 # /user/set_default_permissions method.1222 url = "root/history_set_default_permissions?update_roles_button=Save&id=None&dataset=True"1223 for po in permissions_out:1224 key = '%s_out' % po1225 url = "%s&%s=%s" % ( url, key, str( role_id ) )1226 for pi in permissions_in:1227 key = '%s_in' % pi1228 url = "%s&%s=%s" % ( url, key, str( role_id ) )1229 self.visit_url( "%s/%s" % ( self.url, url ) )1230 self.check_page_for_string( 'Default history permissions have been changed.' )1231 def histories_as_xml_tree( self ):1232 """Returns a parsed xml object of all histories"""1233 self.visit_url( '/history/list_as_xml' )1234 xml = self.last_page()1235 tree = ElementTree.fromstring(xml)1236 return tree1237 def history_options( self, user=False, active_datasets=False, activatable_datasets=False, histories_shared_by_others=False ):1238 """Mimics user clicking on history options link"""1239 self.visit_url( "/root/history_options" )1240 if user:1241 self.check_page_for_string( 'Previously</a> stored histories' )1242 if active_datasets:1243 self.check_page_for_string( 'Create</a> a new empty history' )1244 self.check_page_for_string( 'Construct workflow</a> from current history' )1245 self.check_page_for_string( 'Copy</a> current history' )1246 self.check_page_for_string( 'Share</a> current history' )1247 self.check_page_for_string( 'Change default permissions</a> for current history' )1248 if histories_shared_by_others:1249 self.check_page_for_string( 'Histories</a> shared with you by others' )1250 if activatable_datasets:1251 self.check_page_for_string( 'Show deleted</a> datasets in current history' )1252 self.check_page_for_string( 'Rename</a> current history' )1253 self.check_page_for_string( 'Delete</a> current history' )1254 def import_datasets_to_histories( self, cntrller, library_id, ldda_ids='', new_history_name='Unnamed history', strings_displayed=[] ):1255 # Can't use the ~/library_admin/libraries form as twill barfs on it so we'll simulate the form submission1256 # by going directly to the form action1257 self.visit_url( '%s/library_common/import_datasets_to_histories?cntrller=%s&library_id=%s&ldda_ids=%s&new_history_name=%s&import_datasets_to_histories_button=Import+library+datasets' %1258 ( self.url, cntrller, library_id, ldda_ids, new_history_name ) )1259 for check_str in strings_displayed:1260 self.check_page_for_string( check_str )1261 def import_history_via_url( self, history_id, email, strings_displayed_after_submit=[] ):1262 self.visit_url( "/history/imp", params=dict( id=history_id ) )1263 self.check_for_strings( strings_displayed=strings_displayed_after_submit )1264 def is_binary( self, filename ):1265 temp = open( filename, "U" )1266 lineno = 01267 for line in temp:1268 lineno += 11269 line = line.strip()1270 if line:1271 for char in line:1272 if ord( char ) > 128:1273 return True1274 if lineno > 10:1275 break1276 return False1277 def is_history_empty( self ):1278 """1279 Uses history page JSON to determine whether this history is empty1280 (i.e. has no undeleted datasets).1281 """1282 return len( self.get_history_from_api() ) == 01283 def is_zipped( self, filename ):1284 if not zipfile.is_zipfile( filename ):1285 return False1286 return True1287 def json_from_url( self, url, params={} ):1288 self.visit_url( url, params )1289 return loads( self.last_page() )1290 def last_page( self ):1291 return tc.browser.get_html()1292 def last_url( self ):1293 return tc.browser.get_url()1294 def ldda_permissions( self, cntrller, library_id, folder_id, id, role_ids_str,1295 permissions_in=[], permissions_out=[], strings_displayed=[], ldda_name='' ):1296 # role_ids_str must be a comma-separated string of role ids1297 params = dict( cntrller=cntrller, library_id=library_id, folder_id=folder_id, id=id )1298 url = "/library_common/ldda_permissions"1299 for po in permissions_out:1300 params[ '%s_out' % po ] = role_ids_str1301 for pi in permissions_in:1302 params[ '%s_in' % pi ] = role_ids_str1303 if permissions_in or permissions_out:1304 params[ 'update_roles_button' ] = 'Save'1305 self.visit_url( url, params )1306 if not strings_displayed:1307 strings_displayed = [ "Permissions updated for dataset '%s'." % ldda_name ]1308 for check_str in strings_displayed:1309 self.check_page_for_string( check_str )1310 def ldda_info( self, cntrller, library_id, folder_id, ldda_id, strings_displayed=[], strings_not_displayed=[] ):1311 """View library_dataset_dataset_association information"""1312 self.visit_url( "%s/library_common/ldda_info?cntrller=%s&library_id=%s&folder_id=%s&id=%s" %1313 ( self.url, cntrller, library_id, folder_id, ldda_id ) )1314 for check_str in strings_displayed:1315 self.check_page_for_string( check_str )1316 for check_str in strings_not_displayed:1317 try:1318 self.check_page_for_string( check_str )1319 raise AssertionError( "String (%s) should not have been displayed on ldda info page." % check_str )1320 except:1321 pass1322 def ldda_edit_info( self, cntrller, library_id, folder_id, ldda_id, ldda_name, new_ldda_name='', template_refresh_field_name='1_field_name',1323 template_refresh_field_contents='', template_fields=[], strings_displayed=[], strings_not_displayed=[] ):1324 """Edit library_dataset_dataset_association information, optionally template element information"""1325 self.visit_url( "%s/library_common/ldda_edit_info?cntrller=%s&library_id=%s&folder_id=%s&id=%s" %1326 ( self.url, cntrller, library_id, folder_id, ldda_id ) )1327 check_str = 'Edit attributes of %s' % ldda_name1328 self.check_page_for_string( check_str )1329 if new_ldda_name:1330 tc.fv( '1', 'name', new_ldda_name )1331 tc.submit( 'save' )1332 check_str = escape( "Attributes updated for library dataset '%s'." % new_ldda_name )1333 self.check_page_for_string( check_str )1334 if template_refresh_field_contents:1335 # A template containing an AddressField is displayed on the upload form, so we need to refresh the form1336 # with the received template_refresh_field_contents. There are 4 forms on this page, and the template is1337 # contained in the 4th form named "edit_info".1338 self.refresh_form( template_refresh_field_name, template_refresh_field_contents, form_no=4 )1339 if template_fields:1340 # We have an information template associated with the folder, so1341 # there are 2 forms on this page and the template is the 2nd form1342 for field_name, field_value in template_fields:1343 tc.fv( "edit_info", field_name, field_value )1344 tc.submit( 'edit_info_button' )1345 for check_str in strings_displayed:1346 self.check_page_for_string( check_str )1347 for check_str in strings_not_displayed:1348 try:1349 self.check_page_for_string( check_str )1350 raise AssertionError( "String (%s) should not have been displayed on ldda Edit Attributes page." % check_str )1351 except:1352 pass1353 def library_info( self, cntrller, library_id, library_name='', new_name='', new_description='', new_synopsis='',1354 template_fields=[], strings_displayed=[] ):1355 """Edit information about a library, optionally using an existing template with up to 2 elements"""1356 self.visit_url( "%s/library_common/library_info?cntrller=%s&id=%s" % ( self.url, cntrller, library_id ) )1357 for check_str in strings_displayed:1358 self.check_page_for_string( check_str )1359 if new_name and new_description and new_synopsis:1360 tc.fv( '1', 'name', new_name )1361 tc.fv( '1', 'description', new_description )1362 tc.fv( '1', 'synopsis', new_synopsis )1363 tc.submit( 'library_info_button' )1364 self.check_page_for_string( "Information updated for library" )1365 if template_fields:1366 for field_name, field_value in template_fields:1367 # The 2nd form on the page contains the template, and the form is named edit_info.1368 # Set the template field value1369 tc.fv( "edit_info", field_name, field_value )1370 tc.submit( 'edit_info_button' )1371 def library_permissions( self, library_id, library_name, role_ids_str, permissions_in, permissions_out, cntrller='library_admin' ):1372 # role_ids_str must be a comma-separated string of role ids1373 url = "library_common/library_permissions?id=%s&cntrller=%s&update_roles_button=Save" % ( library_id, cntrller )1374 for po in permissions_out:1375 key = '%s_out' % po1376 url = "%s&%s=%s" % ( url, key, role_ids_str )1377 for pi in permissions_in:1378 key = '%s_in' % pi1379 url = "%s&%s=%s" % ( url, key, role_ids_str )1380 self.visit_url( "%s/%s" % ( self.url, url ) )1381 check_str = escape( "Permissions updated for library '%s'." % library_name )1382 self.check_page_for_string( check_str )1383 def library_wait( self, library_id, cntrller='library_admin', maxiter=90 ):1384 """Waits for the tools to finish"""1385 count = 01386 sleep_amount = 11387 while count < maxiter:1388 count += 11389 self.visit_url( "%s/library_common/browse_library?cntrller=%s&id=%s" % ( self.url, cntrller, library_id ) )1390 page = tc.browser.get_html()1391 if page.find( '<!-- running: do not change this comment, used by TwillTestCase.library_wait -->' ) > -1:1392 time.sleep( sleep_amount )1393 sleep_amount += 11394 else:1395 break1396 self.assertNotEqual(count, maxiter)1397 def login( self, email='test@bx.psu.edu', password='testuser', username='admin-user', redirect='', logout_first=True ):1398 # Clear cookies.1399 if logout_first:1400 self.logout()1401 # test@bx.psu.edu is configured as an admin user1402 previously_created, username_taken, invalid_username = \1403 self.create( email=email, password=password, username=username, redirect=redirect )1404 if previously_created:1405 # The acount has previously been created, so just login.1406 # HACK: don't use panels because late_javascripts() messes up the twill browser and it1407 # can't find form fields (and hence user can't be logged in).1408 self.visit_url( "/user/login?use_panels=False" )1409 self.submit_form( 'login', 'login_button', login=email, redirect=redirect, password=password )1410 def logout( self ):1411 self.visit_url( "%s/user/logout" % self.url )1412 self.check_page_for_string( "You have been logged out" )1413 tc.browser.cj.clear()1414 def make_accessible_via_link( self, history_id, strings_displayed=[], strings_displayed_after_submit=[] ):1415 # twill barfs on this form, possibly because it contains no fields, but not sure.1416 # In any case, we have to mimic the form submission1417 self.visit_url( '/history/sharing', dict( id=history_id, make_accessible_via_link=True ) )1418 self.check_for_strings( strings_displayed=strings_displayed_after_submit )1419 def make_library_item_public( self, library_id, id, cntrller='library_admin', item_type='library',1420 contents=False, library_name='', folder_name='', ldda_name='' ):1421 url = "%s/library_common/make_library_item_public?cntrller=%s&library_id=%s&item_type=%s&id=%s&contents=%s" % \1422 ( self.url, cntrller, library_id, item_type, id, str( contents ) )1423 self.visit_url( url )1424 if item_type == 'library':1425 if contents:1426 check_str = "The data library (%s) and all its contents have been made publicly accessible." % library_name1427 else:1428 check_str = "The data library (%s) has been made publicly accessible, but access to its contents has been left unchanged." % library_name1429 elif item_type == 'folder':1430 check_str = "All of the contents of folder (%s) have been made publicly accessible." % folder_name1431 elif item_type == 'ldda':1432 check_str = "The libary dataset (%s) has been made publicly accessible." % ldda_name1433 self.check_page_for_string( check_str )1434 def makeTfname(self, fname=None):1435 """safe temp name - preserve the file extension for tools that interpret it"""1436 suffix = os.path.split(fname)[-1] # ignore full path1437 fd, temp_prefix = tempfile.mkstemp(prefix='tmp', suffix=suffix)1438 return temp_prefix1439 def manage_library_template_inheritance( self, cntrller, item_type, library_id, folder_id=None, ldda_id=None, inheritable=True ):1440 # If inheritable is True, the item is currently inheritable.1441 if item_type == 'library':1442 url = "%s/library_common/manage_template_inheritance?cntrller=%s&item_type=%s&library_id=%s" % \1443 ( self.url, cntrller, item_type, library_id )1444 elif item_type == 'folder':1445 url = "%s/library_common/manage_template_inheritance?cntrller=%s&item_type=%s&library_id=%s&folder_id=%s" % \1446 ( self.url, cntrller, item_type, library_id, folder_id )1447 elif item_type == 'ldda':1448 url = "%s/library_common/manage_template_inheritance?cntrller=%s&item_type=%s&library_id=%s&folder_id=%s&ldda_id=%s" % \1449 ( self.url, cntrller, item_type, library_id, folder_id, ldda_id )1450 self.visit_url( url )1451 if inheritable:1452 self.check_page_for_string = 'will no longer be inherited to contained folders and datasets'1453 else:1454 self.check_page_for_string = 'will now be inherited to contained folders and datasets'1455 def manage_roles_and_groups_for_user( self, user_id, in_role_ids=[], out_role_ids=[],1456 in_group_ids=[], out_group_ids=[], strings_displayed=[] ):1457 url = "%s/admin/manage_roles_and_groups_for_user?id=%s" % ( self.url, user_id )1458 if in_role_ids:1459 url += "&in_roles=%s" % ','.join( in_role_ids )1460 if out_role_ids:1461 url += "&out_roles=%s" % ','.join( out_role_ids )1462 if in_group_ids:1463 url += "&in_groups=%s" % ','.join( in_group_ids )1464 if out_group_ids:1465 url += "&out_groups=%s" % ','.join( out_group_ids )1466 if in_role_ids or out_role_ids or in_group_ids or out_group_ids:1467 url += "&user_roles_groups_edit_button=Save"1468 self.visit_url( url )1469 for check_str in strings_displayed:1470 self.check_page_for_string( check_str )1471 def mark_form_deleted( self, form_id ):1472 """Mark a form_definition as deleted"""1473 url = "%s/forms/delete_form_definition?id=%s" % ( self.url, form_id )1474 self.visit_url( url )1475 check_str = "1 forms have been deleted."1476 self.check_page_for_string( check_str )1477 def mark_group_deleted( self, group_id, group_name ):1478 """Mark a group as deleted"""1479 self.visit_url( "%s/admin/groups?operation=delete&id=%s" % ( self.url, group_id ) )1480 check_str = "Deleted 1 groups: %s" % group_name1481 self.check_page_for_string( check_str )1482 def mark_role_deleted( self, role_id, role_name ):1483 """Mark a role as deleted"""1484 self.visit_url( "%s/admin/roles?operation=delete&id=%s" % ( self.url, role_id ) )1485 check_str = "Deleted 1 roles: %s" % role_name1486 self.check_page_for_string( check_str )1487 def mark_user_deleted( self, user_id, email='' ):1488 """Mark a user as deleted"""1489 self.visit_url( "%s/admin/users?operation=delete&id=%s" % ( self.url, user_id ) )1490 check_str = "Deleted 1 users"1491 self.check_page_for_string( check_str )1492 def move_library_item( self, cntrller, item_type, item_id, source_library_id, make_target_current,1493 target_library_id=None, target_folder_id=None, strings_displayed=[], strings_displayed_after_submit=[] ):1494 params = dict( cntrller=cntrller,1495 item_type=item_type,1496 item_id=item_id,1497 source_library_id=source_library_id,1498 make_target_current=make_target_current )1499 if target_library_id is not None:1500 params[ 'target_library_id' ] = target_library_id1501 if target_folder_id is not None:1502 params[ 'target_folder_id' ] = target_folder_id1503 self.visit_url( "%s/library_common/move_library_item" % self.url, params=params )1504 if target_library_id:1505 self.refresh_form( 'target_library_id', target_library_id, form_name='move_library_item' )1506 if target_folder_id:1507 tc.fv( '1', 'target_folder_id', target_folder_id )1508 for check_str in strings_displayed:1509 self.check_page_for_string( check_str )1510 tc.submit( 'move_library_item_button' )1511 for check_str in strings_displayed_after_submit:1512 self.check_page_for_string( check_str )1513 def new_history( self, name=None ):1514 """Creates a new, empty history"""1515 if name:1516 self.visit_url( "%s/history_new?name=%s" % ( self.url, name ) )1517 else:1518 self.visit_url( "%s/history_new" % self.url )1519 self.check_page_for_string( 'New history created' )1520 assert self.is_history_empty(), 'Creating new history did not result in an empty history.'1521 def purge_group( self, group_id, group_name ):1522 """Purge an existing group"""1523 self.visit_url( "%s/admin/groups?operation=purge&id=%s" % ( self.url, group_id ) )1524 check_str = "Purged 1 groups: %s" % group_name1525 self.check_page_for_string( check_str )1526 def purge_library( self, library_id, library_name ):1527 """Purge a library"""1528 params = dict( id=library_id )1529 self.visit_url( "/library_admin/purge_library", params )1530 check_str = "Library '%s' and all of its contents have been purged" % library_name1531 self.check_page_for_string( check_str )1532 def purge_role( self, role_id, role_name ):1533 """Purge an existing role"""1534 self.visit_url( "%s/admin/roles?operation=purge&id=%s" % ( self.url, role_id ) )1535 check_str = "Purged 1 roles: %s" % role_name1536 self.check_page_for_string( check_str )1537 def purge_user( self, user_id, email ):1538 """Purge a user account"""1539 self.visit_url( "%s/admin/users?operation=purge&id=%s" % ( self.url, user_id ) )1540 check_str = "Purged 1 users"1541 self.check_page_for_string( check_str )1542 def refresh_form( self, control_name, value, form_no=0, form_id=None, form_name=None, **kwd ):1543 """Handles Galaxy's refresh_on_change for forms without ultimately submitting the form"""1544 # control_name is the name of the form field that requires refresh_on_change, and value is1545 # the value to which that field is being set.1546 for i, f in enumerate( self.showforms() ):1547 if i == form_no or ( form_id is not None and f.id == form_id ) or ( form_name is not None and f.name == form_name ):1548 break1549 formcontrols = self.get_form_controls( f )1550 try:1551 control = f.find_control( name=control_name )1552 except:1553 log.debug( '\n'.join( formcontrols ) )1554 # This assumes we always want the first control of the given name, which may not be ideal...1555 control = f.find_control( name=control_name, nr=0 )1556 # Check for refresh_on_change attribute, submit a change if required1557 if 'refresh_on_change' in control.attrs.keys():1558 # Clear Control and set to proper value1559 control.clear()1560 tc.fv( f.name, control.name, value )1561 # Create a new submit control, allows form to refresh, instead of going to next page1562 control = ClientForm.SubmitControl( 'SubmitControl', '___refresh_grouping___', {'name': 'refresh_grouping'} )1563 control.add_to_form( f )1564 control.fixup()1565 # Submit for refresh1566 tc.submit( '___refresh_grouping___' )1567 def reject_request( self, request_id, request_name, comment, strings_displayed=[], strings_displayed_after_submit=[] ):1568 self.visit_url( "%s/requests_admin/reject_request?id=%s" % ( self.url, request_id ) )1569 for check_str in strings_displayed:1570 self.check_page_for_string( check_str )1571 tc.fv( "1", "comment", comment )1572 tc.submit( "reject_button" )1573 for check_str in strings_displayed_after_submit:1574 self.check_page_for_string( check_str )1575 def reload_external_service( self, external_service_type_id, strings_displayed=[], strings_displayed_after_submit=[] ):1576 self.visit_url( '%s/external_service/reload_external_service_types' % self.url )1577 for check_str in strings_displayed:1578 self.check_page_for_string( check_str )1579 tc.fv( "1", "external_service_type_id", external_service_type_id )1580 tc.submit( "reload_external_service_type_button" )1581 for check_str in strings_displayed_after_submit:1582 self.check_page_for_string( check_str )1583 def reload_page( self ):1584 tc.reload()1585 tc.code(200)1586 def rename_role( self, role_id, name='Role One Renamed', description='This is Role One Re-described' ):1587 """Rename a role"""1588 self.visit_url( "%s/admin/roles?operation=rename&id=%s" % ( self.url, role_id ) )1589 self.check_page_for_string( 'Change role name and description' )1590 tc.fv( "1", "name", name )1591 tc.fv( "1", "description", description )1592 tc.submit( "rename_role_button" )1593 def rename_group( self, group_id, name='Group One Renamed' ):1594 """Rename a group"""1595 self.visit_url( "%s/admin/groups?operation=rename&id=%s" % ( self.url, group_id ) )1596 self.check_page_for_string( 'Change group name' )1597 tc.fv( "1", "name", name )1598 tc.submit( "rename_group_button" )1599 def rename_history( self, id, old_name, new_name ):1600 """Rename an existing history"""1601 self.visit_url( "/history/rename", params=dict( id=id, name=new_name ) )1602 check_str = 'History: %s renamed to: %s' % ( old_name, urllib.unquote( new_name ) )1603 self.check_page_for_string( check_str )1604 def rename_sample_datasets( self, sample_id, sample_dataset_ids, new_sample_dataset_names, strings_displayed=[], strings_displayed_after_submit=[] ):1605 sample_dataset_ids_string = ','.join( sample_dataset_ids )1606 url = "%s/requests_admin/manage_datasets?operation=rename&id=%s" % ( self.url, sample_dataset_ids_string )1607 self.visit_url( url )1608 for check_str in strings_displayed:1609 self.check_page_for_string( check_str )1610 for sample_dataset_id, ( prefix, new_name ) in zip( sample_dataset_ids, new_sample_dataset_names ):1611 tc.fv( "1", 'rename_datasets_for_sample_%s' % sample_dataset_id, prefix )1612 tc.fv( "1", 'new_name_%s' % sample_dataset_id, new_name )1613 tc.submit( "rename_datasets_button" )1614 for check_str in strings_displayed_after_submit:1615 self.check_page_for_string( check_str )1616 def request_type_permissions( self, request_type_id, request_type_name, role_ids_str, permissions_in, permissions_out ):1617 # role_ids_str must be a comma-separated string of role ids1618 url = "request_type/request_type_permissions?id=%s&update_roles_button=Save" % ( request_type_id )1619 for po in permissions_out:1620 key = '%s_out' % po1621 url = "%s&%s=%s" % ( url, key, role_ids_str )1622 for pi in permissions_in:1623 key = '%s_in' % pi1624 url = "%s&%s=%s" % ( url, key, role_ids_str )1625 self.visit_url( "%s/%s" % ( self.url, url ) )1626 check_str = "Permissions updated for request type '%s'" % request_type_name1627 self.check_page_for_string( check_str )1628 def reset_password_as_admin( self, user_id, password='testreset' ):1629 """Reset a user password"""1630 self.visit_url( "%s/admin/reset_user_password?id=%s" % ( self.url, user_id ) )1631 tc.fv( "1", "password", password )1632 tc.fv( "1", "confirm", password )1633 tc.submit( "reset_user_password_button" )1634 self.check_page_for_string( "Passwords reset for 1 user." )1635 def run_ucsc_main( self, track_params, output_params ):1636 """Gets Data From UCSC"""1637 tool_id = "ucsc_table_direct1"1638 galaxy_url = "%s/tool_runner/index?" % self.url1639 track_params.update( dict( GALAXY_URL=galaxy_url, hgta_compressType='none', tool_id=tool_id ) )1640 self.visit_url( "http://genome.ucsc.edu/cgi-bin/hgTables", params=track_params )1641 tc.fv( 'mainForm', 'checkboxGalaxy', 'on' )1642 tc.submit( 'hgta_doTopSubmit' )1643 tc.fv( 2, "hgta_geneSeqType", "genomic" )1644 tc.submit( 'hgta_doGenePredSequence' )1645 tc.fv( 2, 'hgSeq.casing', 'upper' )1646 tc.submit( 'hgta_doGalaxyQuery' )1647 def save_log( *path ):1648 """Saves the log to a file"""1649 filename = os.path.join( *path )1650 file(filename, 'wt').write(buffer.getvalue())1651 def set_history( self ):1652 """Sets the history (stores the cookies for this run)"""1653 if self.history_id:1654 self.visit_url( "/history", params=dict( id=self.history_id ) )1655 else:1656 self.new_history()1657 def share_current_history( self, email, strings_displayed=[], strings_displayed_after_submit=[],1658 action='', action_strings_displayed=[], action_strings_displayed_after_submit=[] ):1659 """Share the current history with different users"""1660 self.visit_url( "%s/history/share" % self.url )1661 for check_str in strings_displayed:1662 self.check_page_for_string( check_str )1663 tc.fv( 'share', 'email', email )1664 tc.submit( 'share_button' )1665 for check_str in strings_displayed_after_submit:1666 self.check_page_for_string( check_str )1667 if action:1668 # If we have an action, then we are sharing datasets with users that do not have access permissions on them1669 for check_str in action_strings_displayed:1670 self.check_page_for_string( check_str )1671 tc.fv( 'share_restricted', 'action', action )1672 tc.submit( "share_restricted_button" )1673 for check_str in action_strings_displayed_after_submit:1674 self.check_page_for_string( check_str )1675 def share_histories_with_users( self, ids, emails, strings_displayed=[], strings_displayed_after_submit=[],1676 action=None, action_strings_displayed=[] ):1677 """Share one or more histories with one or more different users"""1678 self.visit_url( "%s/history/list?id=%s&operation=Share" % ( self.url, ids ) )1679 for check_str in strings_displayed:1680 self.check_page_for_string( check_str )1681 tc.fv( 'share', 'email', emails )1682 tc.submit( 'share_button' )1683 for check_str in strings_displayed_after_submit:1684 self.check_page_for_string( check_str )1685 if action:1686 # If we have an action, then we are sharing datasets with users that do not have access permissions on them1687 tc.fv( 'share_restricted', 'action', action )1688 tc.submit( "share_restricted_button" )1689 for check_str in action_strings_displayed:1690 self.check_page_for_string( check_str )1691 def show_cookies( self ):1692 return tc.show_cookies()1693 def showforms( self ):1694 """Shows form, helpful for debugging new tests"""1695 return tc.showforms()1696 def start_sample_datasets_transfer( self, sample_id, sample_dataset_ids, strings_displayed=[], strings_displayed_after_submit=[], strings_displayed_count=[], strings_not_displayed=[] ):1697 url = '%s/requests_admin/manage_datasets?cntrller=requests_admin&sample_id=%s' % ( self.url, sample_id )1698 self.visit_url( url )1699 for check_str in strings_displayed:1700 self.check_page_for_string( check_str )1701 # simulate selecting datasets and clicking the transfer button on the sample datasets grid1702 sample_dataset_ids_string = ','.join( sample_dataset_ids )1703 url = "%s/requests_admin/manage_datasets?operation=transfer&id=%s" % ( self.url, sample_dataset_ids_string )1704 self.visit_url( url )1705 for check_str in strings_displayed_after_submit:1706 self.check_page_for_string( check_str )1707 for check_str in strings_not_displayed:1708 self.check_string_not_in_page( check_str )1709 for check_str, count in strings_displayed_count:1710 self.check_string_count_in_page( check_str, count )1711 def submit_form( self, form_no=0, button="runtool_btn", **kwd ):1712 """Populates and submits a form from the keyword arguments."""1713 # An HTMLForm contains a sequence of Controls. Supported control classes are:1714 # TextControl, FileControl, ListControl, RadioControl, CheckboxControl, SelectControl,1715 # SubmitControl, ImageControl1716 for i, f in enumerate( self.showforms() ):1717 if i == form_no:1718 break1719 # To help with debugging a tool, print out the form controls when the test fails1720 print "form '%s' contains the following controls ( note the values )" % f.name1721 controls = {}1722 formcontrols = self.get_form_controls( f )1723 hc_prefix = '<HiddenControl('1724 for i, control in enumerate( f.controls ):1725 if hc_prefix not in str( control ):1726 try:1727 # check if a repeat element needs to be added1728 if control.name is not None:1729 if control.name not in kwd and control.name.endswith( '_add' ):1730 # control name doesn't exist, could be repeat1731 repeat_startswith = control.name[0:-4]1732 if repeat_startswith and not [ c_name for c_name in controls.keys() if c_name.startswith( repeat_startswith ) ] and [ c_name for c_name in kwd.keys() if c_name.startswith( repeat_startswith ) ]:1733 tc.browser.clicked( f, control )1734 tc.submit( control.name )1735 return self.submit_form( form_no=form_no, button=button, **kwd )1736 # Check for refresh_on_change attribute, submit a change if required1737 if hasattr( control, 'attrs' ) and 'refresh_on_change' in control.attrs.keys():1738 changed = False1739 # For DataToolParameter, control.value is the HDA id, but kwd contains the filename.1740 # This loop gets the filename/label for the selected values.1741 item_labels = [ item.attrs[ 'label' ] for item in control.get_items() if item.selected ]1742 for value in kwd[ control.name ]:1743 if value not in control.value and True not in [ value in item_label for item_label in item_labels ]:1744 changed = True1745 break1746 if changed:1747 # Clear Control and set to proper value1748 control.clear()1749 # kwd[control.name] should be a singlelist1750 for elem in kwd[ control.name ]:1751 tc.fv( f.name, control.name, str( elem ) )1752 # Create a new submit control, allows form to refresh, instead of going to next page1753 control = ClientForm.SubmitControl( 'SubmitControl', '___refresh_grouping___', {'name': 'refresh_grouping'} )1754 control.add_to_form( f )1755 control.fixup()1756 # Submit for refresh1757 tc.submit( '___refresh_grouping___' )1758 return self.submit_form( form_no=form_no, button=button, **kwd )1759 except Exception:1760 log.exception( "In submit_form, continuing, but caught exception." )1761 for formcontrol in formcontrols:1762 log.debug( formcontrol )1763 continue1764 controls[ control.name ] = control1765 # No refresh_on_change attribute found in current form, so process as usual1766 for control_name, control_value in kwd.items():1767 if control_name not in controls:1768 continue # these cannot be handled safely - cause the test to barf out1769 if not isinstance( control_value, list ):1770 control_value = [ control_value ]1771 control = controls[ control_name ]1772 control.clear()1773 if control.is_of_kind( "text" ):1774 tc.fv( f.name, control.name, ",".join( control_value ) )1775 elif control.is_of_kind( "list" ):1776 try:1777 if control.is_of_kind( "multilist" ):1778 if control.type == "checkbox":1779 def is_checked( value ):1780 # Copied from form_builder.CheckboxField1781 if value is True:1782 return True1783 if isinstance( value, list ):1784 value = value[0]1785 return isinstance( value, basestring ) and value.lower() in ( "yes", "true", "on" )1786 try:1787 checkbox = control.get()1788 checkbox.selected = is_checked( control_value )1789 except Exception, e1:1790 print "Attempting to set checkbox selected value threw exception: ", e11791 # if there's more than one checkbox, probably should use the behaviour for1792 # ClientForm.ListControl ( see twill code ), but this works for now...1793 for elem in control_value:1794 control.get( name=elem ).selected = True1795 else:1796 for elem in control_value:1797 try:1798 # Doubt this case would ever work, but want1799 # to preserve backward compat.1800 control.get( name=elem ).selected = True1801 except Exception:1802 # ... anyway this is really what we want to1803 # do, probably even want to try the len(1804 # elem ) > 30 check below.1805 control.get( label=elem ).selected = True1806 else: # control.is_of_kind( "singlelist" )1807 for elem in control_value:1808 try:1809 tc.fv( f.name, control.name, str( elem ) )1810 except Exception:1811 try:1812 # Galaxy truncates long file names in the dataset_collector in galaxy/tools/parameters/basic.py1813 if len( elem ) > 30:1814 elem_name = '%s..%s' % ( elem[:17], elem[-11:] )1815 tc.fv( f.name, control.name, str( elem_name ) )1816 pass1817 else:1818 raise1819 except Exception:1820 raise1821 except Exception:1822 for formcontrol in formcontrols:1823 log.debug( formcontrol )1824 log.exception( "Attempting to set control '%s' to value '%s' (also tried '%s') threw exception.", control.name, elem, elem_name )1825 pass1826 except Exception, exc:1827 for formcontrol in formcontrols:1828 log.debug( formcontrol )1829 errmsg = "Attempting to set field '%s' to value '%s' in form '%s' threw exception: %s\n" % ( control_name, str( control_value ), f.name, str( exc ) )1830 errmsg += "control: %s\n" % str( control )1831 errmsg += "If the above control is a DataToolparameter whose data type class does not include a sniff() method,\n"1832 errmsg += "make sure to include a proper 'ftype' attribute to the tag for the control within the <test> tag set.\n"1833 raise AssertionError( errmsg )1834 else:1835 # Add conditions for other control types here when necessary.1836 pass1837 tc.submit( button )1838 def submit_request( self, cntrller, request_id, request_name, strings_displayed_after_submit=[] ):1839 self.visit_url( "%s/requests_common/submit_request?cntrller=%s&id=%s" % ( self.url, cntrller, request_id ) )1840 for check_str in strings_displayed_after_submit:1841 self.check_page_for_string( check_str )1842 def switch_history( self, id='', name='' ):1843 """Switches to a history in the current list of histories"""1844 params = dict( operation='switch', id=id )1845 self.visit_url( "/history/list", params )1846 if name:1847 self.check_history_for_exact_string( name )1848 def undelete_group( self, group_id, group_name ):1849 """Undelete an existing group"""1850 self.visit_url( "%s/admin/groups?operation=undelete&id=%s" % ( self.url, group_id ) )1851 check_str = "Undeleted 1 groups: %s" % group_name1852 self.check_page_for_string( check_str )1853 def undelete_history_item( self, hda_id, strings_displayed=[] ):1854 """Un-deletes a deleted item in a history"""1855 try:1856 hda_id = int( hda_id )1857 except:1858 raise AssertionError( "Invalid hda_id '%s' - must be int" % hda_id )1859 self.visit_url( "%s/datasets/%s/undelete" % ( self.url, self.security.encode_id( hda_id ) ) )1860 for check_str in strings_displayed:1861 self.check_page_for_string( check_str )1862 def undelete_library_item( self, cntrller, library_id, item_id, item_name, item_type='library_dataset' ):1863 """Mark a library item as deleted"""1864 params = dict( cntrller=cntrller, library_id=library_id, item_id=item_id, item_type=item_type )1865 self.visit_url( "/library_common/undelete_library_item", params )1866 if item_type == 'library_dataset':1867 item_desc = 'Dataset'1868 else:1869 item_desc = item_type.capitalize()1870 check_str = "marked undeleted"1871 self.check_for_strings( strings_displayed=[ item_desc, check_str ] )1872 def undelete_role( self, role_id, role_name ):1873 """Undelete an existing role"""1874 self.visit_url( "%s/admin/roles?operation=undelete&id=%s" % ( self.url, role_id ) )1875 check_str = "Undeleted 1 roles: %s" % role_name1876 self.check_page_for_string( check_str )1877 def undelete_user( self, user_id, email='' ):1878 """Undelete a user"""1879 self.visit_url( "%s/admin/users?operation=undelete&id=%s" % ( self.url, user_id ) )1880 check_str = "Undeleted 1 users"1881 self.check_page_for_string( check_str )1882 def unshare_history( self, history_id, user_id, strings_displayed=[] ):1883 """Unshare a history that has been shared with another user"""1884 self.visit_url( "/history/sharing", params=dict( id=history_id ) )1885 for check_str in strings_displayed:1886 self.check_page_for_string( check_str )1887 self.visit_url( "/history/sharing", params=dict( unshare_user=user_id, id=history_id ) )1888 def upload_library_dataset( self, cntrller, library_id, folder_id, filename='', server_dir='', replace_id='',1889 upload_option='upload_file', file_type='auto', dbkey='hg18', space_to_tab='',1890 link_data_only='copy_files', preserve_dirs='Yes', filesystem_paths='', roles=[],1891 ldda_message='', hda_ids='', template_refresh_field_name='1_field_name',1892 template_refresh_field_contents='', template_fields=[], show_deleted='False', strings_displayed=[] ):1893 """Add datasets to library using any upload_option"""1894 # NOTE: due to the library_wait() method call at the end of this method, no tests should be done1895 # for strings_displayed_after_submit.1896 params = dict( cntrller=cntrller, library_id=library_id, folder_id=folder_id )1897 url = "/library_common/upload_library_dataset"1898 if replace_id:1899 # If we're uploading a new version of a library dataset, we have to include the replace_id param in the1900 # request because the form field named replace_id will not be displayed on the upload form if we dont.1901 params[ 'replace_id' ] = replace_id1902 self.visit_url( url, params=params )1903 if template_refresh_field_contents:1904 # A template containing an AddressField is displayed on the upload form, so we need to refresh the form1905 # with the received template_refresh_field_contents.1906 self.refresh_form( template_refresh_field_name, template_refresh_field_contents )1907 for tup in template_fields:1908 tc.fv( "1", tup[0], tup[1] )1909 tc.fv( "upload_library_dataset", "library_id", library_id )1910 tc.fv( "upload_library_dataset", "folder_id", folder_id )1911 tc.fv( "upload_library_dataset", "show_deleted", show_deleted )1912 tc.fv( "upload_library_dataset", "ldda_message", ldda_message )1913 tc.fv( "upload_library_dataset", "file_type", file_type )1914 tc.fv( "upload_library_dataset", "dbkey", dbkey )1915 if space_to_tab:1916 tc.fv( "upload_library_dataset", "space_to_tab", space_to_tab )1917 for role_id in roles:1918 tc.fv( "upload_library_dataset", "roles", role_id )1919 # Refresh the form by selecting the upload_option - we do this here to ensure1920 # all previously entered form contents are retained.1921 self.refresh_form( 'upload_option', upload_option )1922 if upload_option == 'import_from_history':1923 for check_str in strings_displayed:1924 self.check_page_for_string( check_str )1925 if hda_ids:1926 # Twill cannot handle multi-checkboxes, so the form can only have 1 hda_ids checkbox1927 try:1928 tc.fv( "add_history_datasets_to_library", "hda_ids", hda_ids )1929 except:1930 tc.fv( "add_history_datasets_to_library", "hda_ids", '1' )1931 tc.submit( 'add_history_datasets_to_library_button' )1932 else:1933 if upload_option in [ 'upload_paths', 'upload_directory' ]:1934 tc.fv( "upload_library_dataset", "link_data_only", link_data_only )1935 if upload_option == 'upload_paths':1936 tc.fv( "upload_library_dataset", "filesystem_paths", filesystem_paths )1937 if upload_option == 'upload_directory' and server_dir:1938 tc.fv( "upload_library_dataset", "server_dir", server_dir )1939 if upload_option == 'upload_file':1940 if filename:1941 filename = self.get_filename( filename )1942 tc.formfile( "upload_library_dataset", "files_0|file_data", filename )1943 for check_str in strings_displayed:1944 self.check_page_for_string( check_str )1945 tc.submit( "runtool_btn" )1946 # Give the files some time to finish uploading1947 self.library_wait( library_id )1948 def upload_file( self, filename, ftype='auto', dbkey='unspecified (?)', space_to_tab=False, metadata=None, composite_data=None, name=None, shed_tool_id=None, wait=True ):1949 """1950 Uploads a file. If shed_tool_id has a value, we're testing tools migrated from the distribution to the tool shed,1951 so the tool-data directory of test data files is contained in the installed tool shed repository.1952 """1953 self.visit_url( "%s/tool_runner?tool_id=upload1" % self.url )1954 try:1955 self.refresh_form( "file_type", ftype ) # Refresh, to support composite files1956 tc.fv( "tool_form", "dbkey", dbkey )1957 if metadata:1958 for elem in metadata:1959 tc.fv( "tool_form", "files_metadata|%s" % elem.get( 'name' ), elem.get( 'value' ) )1960 if composite_data:1961 for i, composite_file in enumerate( composite_data ):1962 filename = self.get_filename( composite_file.get( 'value' ), shed_tool_id=shed_tool_id )1963 tc.formfile( "tool_form", "files_%i|file_data" % i, filename )1964 tc.fv( "tool_form", "files_%i|space_to_tab" % i, composite_file.get( 'space_to_tab', False ) )1965 else:1966 filename = self.get_filename( filename, shed_tool_id=shed_tool_id )1967 tc.formfile( "tool_form", "file_data", filename )1968 tc.fv( "tool_form", "space_to_tab", space_to_tab )1969 if name:1970 # NAME is a hidden form element, so the following prop must1971 # set to use it.1972 tc.config("readonly_controls_writeable", 1)1973 tc.fv( "tool_form", "NAME", name )1974 tc.submit( "runtool_btn" )1975 except AssertionError, err:1976 errmsg = "Uploading file resulted in the following exception. Make sure the file (%s) exists. " % filename1977 errmsg += str( err )1978 raise AssertionError( errmsg )1979 if not wait:1980 return1981 # Make sure every history item has a valid hid1982 hids = self.get_hids_in_history( self.get_latest_history()[ 'id' ] )1983 for hid in hids:1984 try:1985 int( hid )1986 except:1987 raise AssertionError( "Invalid hid (%s) created when uploading file %s" % ( hid, filename ) )1988 # Wait for upload processing to finish (TODO: this should be done in each test case instead)1989 self.wait()1990 def upload_url_paste( self, url_paste, ftype='auto', dbkey='unspecified (?)' ):1991 """Pasted data in the upload utility"""1992 self.visit_url( "/tool_runner/index?tool_id=upload1" )1993 try:1994 self.refresh_form( "file_type", ftype ) # Refresh, to support composite files1995 tc.fv( "tool_form", "dbkey", dbkey )1996 tc.fv( "tool_form", "url_paste", url_paste )1997 tc.submit( "runtool_btn" )1998 except Exception, e:1999 errmsg = "Problem executing upload utility using url_paste: %s" % str( e )2000 raise AssertionError( errmsg )2001 # Make sure every history item has a valid hid2002 hids = self.get_hids_in_history( self.get_latest_history()[ 'id' ] )2003 for hid in hids:2004 try:2005 int( hid )2006 except:2007 raise AssertionError( "Invalid hid (%s) created when pasting %s" % ( hid, url_paste ) )2008 # Wait for upload processing to finish (TODO: this should be done in each test case instead)2009 self.wait()2010 def user_set_default_permissions( self, cntrller='user', permissions_out=[], permissions_in=[], role_id='2' ):2011 # role.id = 2 is Private Role for test2@bx.psu.edu2012 # NOTE: Twill has a bug that requires the ~/user/permissions page to contain at least 1 option value2013 # in each select list or twill throws an exception, which is: ParseError: OPTION outside of SELECT2014 # Due to this bug, we'll bypass visiting the page, and simply pass the permissions on to the2015 # /user/set_default_permissions method.2016 url = "user/set_default_permissions?cntrller=%s&update_roles_button=Save&id=None" % cntrller2017 for po in permissions_out:2018 key = '%s_out' % po2019 url = "%s&%s=%s" % ( url, key, str( role_id ) )2020 for pi in permissions_in:2021 key = '%s_in' % pi2022 url = "%s&%s=%s" % ( url, key, str( role_id ) )2023 self.visit_url( "%s/%s" % ( self.url, url ) )2024 self.check_page_for_string( 'Default new history permissions have been changed.' )2025 def verify_composite_datatype_file_content( self, file_name, hda_id, base_name=None, attributes=None, dataset_fetcher=None, shed_tool_id=None ):2026 dataset_fetcher = dataset_fetcher or self.__default_dataset_fetcher()2027 local_name = self.get_filename( file_name, shed_tool_id=shed_tool_id )2028 if base_name is None:2029 base_name = os.path.split(file_name)[-1]2030 temp_name = self.makeTfname(fname=base_name)2031 data = dataset_fetcher( hda_id, base_name )2032 file( temp_name, 'wb' ).write( data )2033 if self.keepOutdir > '':2034 ofn = os.path.join(self.keepOutdir, base_name)2035 shutil.copy(temp_name, ofn)2036 log.debug('## GALAXY_TEST_SAVE=%s. saved %s' % (self.keepOutdir, ofn))2037 try:2038 if attributes is None:2039 attributes = {}2040 compare = attributes.get( 'compare', 'diff' )2041 if compare == 'diff':2042 self.files_diff( local_name, temp_name, attributes=attributes )2043 elif compare == 're_match':2044 self.files_re_match( local_name, temp_name, attributes=attributes )2045 elif compare == 're_match_multiline':2046 self.files_re_match_multiline( local_name, temp_name, attributes=attributes )2047 elif compare == 'sim_size':2048 delta = attributes.get('delta', '100')2049 s1 = len(data)2050 s2 = os.path.getsize(local_name)2051 if abs(s1 - s2) > int(delta):2052 raise Exception( 'Files %s=%db but %s=%db - compare (delta=%s) failed' % (temp_name, s1, local_name, s2, delta) )2053 else:2054 raise Exception( 'Unimplemented Compare type: %s' % compare )2055 except AssertionError, err:2056 errmsg = 'Composite file (%s) of History item %s different than expected, difference (using %s):\n' % ( base_name, hda_id, compare )2057 errmsg += str( err )2058 raise AssertionError( errmsg )2059 finally:2060 if 'GALAXY_TEST_NO_CLEANUP' not in os.environ:2061 os.remove( temp_name )2062 def verify_dataset_correctness( self, filename, hid=None, wait=True, maxseconds=120, attributes=None, shed_tool_id=None ):2063 """Verifies that the attributes and contents of a history item meet expectations"""2064 if wait:2065 self.wait( maxseconds=maxseconds ) # wait for job to finish2066 data_list = self.get_history_from_api( encoded_history_id=None, show_deleted=False, show_details=False )2067 self.assertTrue( data_list )2068 if hid is None: # take last hid2069 dataset = data_list[-1]2070 hid = str( dataset.get('hid') )2071 else:2072 datasets = [ dataset for dataset in data_list if str( dataset.get('hid') ) == str( hid ) ]2073 self.assertTrue( len( datasets ) == 1 )2074 dataset = datasets[0]2075 self.assertTrue( hid )2076 dataset = self.json_from_url( dataset[ 'url' ] )2077 self._assert_dataset_state( dataset, 'ok' )2078 if filename is not None and self.is_zipped( filename ):2079 errmsg = 'History item %s is a zip archive which includes invalid files:\n' % hid2080 zip_file = zipfile.ZipFile( filename, "r" )2081 name = zip_file.namelist()[0]2082 test_ext = name.split( "." )[1].strip().lower()2083 if not ( test_ext == 'scf' or test_ext == 'ab1' or test_ext == 'txt' ):2084 raise AssertionError( errmsg )2085 for name in zip_file.namelist():2086 ext = name.split( "." )[1].strip().lower()2087 if ext != test_ext:2088 raise AssertionError( errmsg )2089 else:2090 # See not in controllers/root.py about encoded_id.2091 hda_id = dataset.get( 'id' )2092 self.verify_hid( filename, hid=hid, hda_id=hda_id, attributes=attributes, shed_tool_id=shed_tool_id)2093 def verify_extra_files_content( self, extra_files, hda_id, dataset_fetcher, shed_tool_id=None ):2094 files_list = []2095 for extra_type, extra_value, extra_name, extra_attributes in extra_files:2096 if extra_type == 'file':2097 files_list.append( ( extra_name, extra_value, extra_attributes ) )2098 elif extra_type == 'directory':2099 for filename in os.listdir( self.get_filename( extra_value, shed_tool_id=shed_tool_id ) ):2100 files_list.append( ( filename, os.path.join( extra_value, filename ), extra_attributes ) )2101 else:2102 raise ValueError( 'unknown extra_files type: %s' % extra_type )2103 for filename, filepath, attributes in files_list:2104 self.verify_composite_datatype_file_content( filepath, hda_id, base_name=filename, attributes=attributes, dataset_fetcher=dataset_fetcher, shed_tool_id=shed_tool_id )2105 def verify_hid( self, filename, hda_id, attributes, shed_tool_id, hid="", dataset_fetcher=None):2106 dataset_fetcher = dataset_fetcher or self.__default_dataset_fetcher()2107 data = dataset_fetcher( hda_id )2108 if attributes is not None and attributes.get( "assert_list", None ) is not None:2109 try:2110 verify_assertions(data, attributes["assert_list"])2111 except AssertionError, err:2112 errmsg = 'History item %s different than expected\n' % (hid)2113 errmsg += str( err )2114 raise AssertionError( errmsg )2115 if attributes is not None and attributes.get("md5", None) is not None:2116 md5 = attributes.get("md5")2117 try:2118 self._verify_md5(data, md5)2119 except AssertionError, err:2120 errmsg = 'History item %s different than expected\n' % (hid)2121 errmsg += str( err )2122 raise AssertionError( errmsg )2123 if filename is not None:2124 local_name = self.get_filename( filename, shed_tool_id=shed_tool_id )2125 temp_name = self.makeTfname(fname=filename)2126 file( temp_name, 'wb' ).write( data )2127 # if the server's env has GALAXY_TEST_SAVE, save the output file to that dir2128 if self.keepOutdir:2129 ofn = os.path.join( self.keepOutdir, os.path.basename( local_name ) )2130 log.debug( 'keepoutdir: %s, ofn: %s', self.keepOutdir, ofn )2131 try:2132 shutil.copy( temp_name, ofn )2133 except Exception, exc:2134 error_log_msg = ( 'TwillTestCase could not save output file %s to %s: ' % ( temp_name, ofn ) )2135 error_log_msg += str( exc )2136 log.error( error_log_msg, exc_info=True )2137 else:2138 log.debug('## GALAXY_TEST_SAVE=%s. saved %s' % ( self.keepOutdir, ofn ) )2139 try:2140 if attributes is None:2141 attributes = {}2142 compare = attributes.get( 'compare', 'diff' )2143 if attributes.get( 'ftype', None ) == 'bam':2144 local_fh, temp_name = self._bam_to_sam( local_name, temp_name )2145 local_name = local_fh.name2146 extra_files = attributes.get( 'extra_files', None )2147 if compare == 'diff':2148 self.files_diff( local_name, temp_name, attributes=attributes )2149 elif compare == 're_match':2150 self.files_re_match( local_name, temp_name, attributes=attributes )2151 elif compare == 're_match_multiline':2152 self.files_re_match_multiline( local_name, temp_name, attributes=attributes )2153 elif compare == 'sim_size':2154 delta = attributes.get('delta', '100')2155 s1 = len(data)2156 s2 = os.path.getsize(local_name)2157 if abs(s1 - s2) > int(delta):2158 raise Exception( 'Files %s=%db but %s=%db - compare (delta=%s) failed' % (temp_name, s1, local_name, s2, delta) )2159 elif compare == "contains":2160 self.files_contains( local_name, temp_name, attributes=attributes )2161 else:2162 raise Exception( 'Unimplemented Compare type: %s' % compare )2163 if extra_files:2164 self.verify_extra_files_content( extra_files, hda_id, shed_tool_id=shed_tool_id, dataset_fetcher=dataset_fetcher )2165 except AssertionError, err:2166 errmsg = 'History item %s different than expected, difference (using %s):\n' % ( hid, compare )2167 errmsg += "( %s v. %s )\n" % ( local_name, temp_name )2168 errmsg += str( err )2169 raise AssertionError( errmsg )2170 finally:2171 if 'GALAXY_TEST_NO_CLEANUP' not in os.environ:2172 os.remove( temp_name )2173 def _verify_md5( self, data, expected_md5 ):2174 import md52175 m = md5.new()2176 m.update( data )2177 actual_md5 = m.hexdigest()2178 if expected_md5 != actual_md5:2179 template = "Output md5sum [%s] does not match expected [%s]."2180 message = template % (actual_md5, expected_md5)2181 assert False, message2182 def view_external_service( self, external_service_id, strings_displayed=[] ):2183 self.visit_url( '%s/external_service/view_external_service?id=%s' % ( self.url, external_service_id ) )2184 for check_str in strings_displayed:2185 self.check_page_for_string( check_str )2186 def view_form( self, id, form_type='', form_name='', form_desc='', form_layout_name='', field_dicts=[] ):2187 '''View form details'''2188 self.visit_url( "%s/forms/view_latest_form_definition?id=%s" % ( self.url, id ) )2189 # self.check_page_for_string( form_type )2190 self.check_page_for_string( form_name )2191 # self.check_page_for_string( form_desc )2192 self.check_page_for_string( form_layout_name )2193 for i, field_dict in enumerate( field_dicts ):2194 self.check_page_for_string( field_dict[ 'label' ] )2195 self.check_page_for_string( field_dict[ 'desc' ] )2196 self.check_page_for_string( field_dict[ 'type' ] )2197 if field_dict[ 'type' ].lower() == 'selectfield':2198 for option_index, option in enumerate( field_dict[ 'selectlist' ] ):2199 self.check_page_for_string( option )2200 def view_history( self, history_id, strings_displayed=[] ):2201 """Displays a history for viewing"""2202 self.visit_url( '%s/history/view?id=%s' % ( self.url, self.security.encode_id( history_id ) ) )2203 for check_str in strings_displayed:2204 self.check_page_for_string( check_str )2205 def view_request( self, cntrller, request_id, strings_displayed=[], strings_displayed_count=[], strings_not_displayed=[] ):2206 self.visit_url( "%s/%s/browse_requests?operation=view_request&id=%s" % ( self.url, cntrller, request_id ) )2207 self.check_page( strings_displayed, strings_displayed_count, strings_not_displayed )2208 def view_request_history( self, cntrller, request_id, strings_displayed=[], strings_displayed_count=[], strings_not_displayed=[] ):2209 self.visit_url( "%s/requests_common/view_request_history?cntrller=%s&id=%s" % ( self.url, cntrller, request_id ) )2210 self.check_page( strings_displayed, strings_displayed_count, strings_not_displayed )2211 def view_request_type( self, request_type_id, request_type_name, sample_states, strings_displayed=[] ):2212 '''View request_type details'''2213 self.visit_url( "%s/request_type/view_request_type?id=%s" % ( self.url, request_type_id ) )2214 self.check_page_for_string( '"%s" request type' % request_type_name )2215 for name, desc in sample_states:2216 self.check_page_for_string( name )2217 self.check_page_for_string( desc )2218 for check_str in strings_displayed:2219 self.check_page_for_string( check_str )2220 def view_sample_dataset( self, sample_dataset_id, strings_displayed=[], strings_displayed_count=[], strings_not_displayed=[] ):2221 self.visit_url( "%s/requests_admin/manage_datasets?operation=view&id=%s" % ( self.url, sample_dataset_id ) )2222 self.check_page( strings_displayed, strings_displayed_count, strings_not_displayed )2223 def view_sample_history( self, cntrller, sample_id, strings_displayed=[], strings_displayed_count=[], strings_not_displayed=[] ):2224 self.visit_url( "%s/requests_common/view_sample_history?cntrller=%s&sample_id=%s" % ( self.url, cntrller, sample_id ) )2225 self.check_page( strings_displayed, strings_displayed_count, strings_not_displayed )2226 def view_shared_histories( self, strings_displayed=[] ):2227 self.visit_url( "/history/list_shared" )2228 for check_str in strings_displayed:2229 self.check_page_for_string( check_str )2230 def view_stored_active_histories( self, strings_displayed=[] ):2231 self.visit_url( "/history/list" )2232 self.check_page_for_string( 'Saved Histories' )2233 self.check_page_for_string( 'operation=Rename' )2234 self.check_page_for_string( 'operation=Switch' )2235 self.check_page_for_string( 'operation=Delete' )2236 for check_str in strings_displayed:2237 self.check_page_for_string( check_str )2238 def view_stored_deleted_histories( self, strings_displayed=[] ):2239 self.visit_url( "/history/list?f-deleted=True" )2240 self.check_page_for_string( 'Saved Histories' )2241 self.check_page_for_string( 'operation=Undelete' )2242 for check_str in strings_displayed:2243 self.check_page_for_string( check_str )2244 def visit_url( self, url, params=None, doseq=False, allowed_codes=[ 200 ] ):2245 if params is None:2246 params = dict()2247 parsed_url = urlparse( url )2248 if len( parsed_url.netloc ) == 0:2249 url = 'http://%s:%s%s' % ( self.host, self.port, parsed_url.path )2250 else:2251 url = '%s://%s%s' % ( parsed_url.scheme, parsed_url.netloc, parsed_url.path )2252 if parsed_url.query:2253 for query_parameter in parsed_url.query.split( '&' ):2254 key, value = query_parameter.split( '=' )2255 params[ key ] = value2256 if params:2257 url += '?%s' % urllib.urlencode( params, doseq=doseq )2258 new_url = tc.go( url )2259 return_code = tc.browser.get_code()2260 assert return_code in allowed_codes, 'Invalid HTTP return code %s, allowed codes: %s' % \2261 ( return_code, ', '.join( str( code ) for code in allowed_codes ) )2262 return new_url2263 def wait( self, **kwds ):2264 """Waits for the tools to finish"""2265 return self.wait_for(lambda: self.get_running_datasets(), **kwds)2266 def wait_for( self, func, **kwd ):2267 sleep_amount = 0.22268 slept = 02269 walltime_exceeded = kwd.get("maxseconds", None)2270 if walltime_exceeded is None:2271 walltime_exceeded = DEFAULT_TOOL_TEST_WAIT2272 exceeded = True2273 while slept <= walltime_exceeded:2274 result = func()2275 if result:2276 time.sleep( sleep_amount )2277 slept += sleep_amount2278 sleep_amount *= 22279 else:2280 exceeded = False2281 break2282 if exceeded:2283 message = 'Tool test run exceeded walltime [total %s, max %s], terminating.' % (slept, walltime_exceeded)2284 log.info(message)2285 raise AssertionError(message)2286 def write_temp_file( self, content, suffix='.html' ):2287 fd, fname = tempfile.mkstemp( suffix=suffix, prefix='twilltestcase-' )2288 f = os.fdopen( fd, "w" )2289 f.write( content )2290 f.close()2291 return fname2292 def _assert_dataset_state( self, dataset, state ):2293 if dataset.get( 'state' ) != state:2294 blurb = dataset.get( 'misc_blurb' )2295 errmsg = "Expecting dataset state '%s', but state is '%s'. Dataset blurb: %s\n\n" % ( state, dataset.get('state'), blurb.strip() )2296 errmsg += self.get_job_stderr( dataset.get( 'id' ), format=True )2297 raise AssertionError( errmsg )2298 def _check_command(self, command, description):2299 # TODO: also collect ``which samtools`` and ``samtools --version``2300 p = subprocess.Popen( args=command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )2301 (stdout, stderr) = p.communicate()2302 if p.returncode:2303 template = description2304 template += " failed: (cmd=[%s], stdout=[%s], stderr=[%s])"2305 message = template % (command, stdout, stderr)2306 raise AssertionError(message)2307 def _bam_to_sam( self, local_name, temp_name ):2308 temp_local = tempfile.NamedTemporaryFile( suffix='.sam', prefix='local_bam_converted_to_sam_' )2309 fd, temp_temp = tempfile.mkstemp( suffix='.sam', prefix='history_bam_converted_to_sam_' )2310 os.close( fd )2311 command = 'samtools view -h -o "%s" "%s"' % ( temp_local.name, local_name )2312 self._check_command( command, 'Converting local (test-data) bam to sam' )2313 command = 'samtools view -h -o "%s" "%s"' % ( temp_temp, temp_name )2314 self._check_command( command, 'Converting history bam to sam ' )2315 os.remove( temp_name )2316 return temp_local, temp_temp2317 def _format_stream( self, output, stream, format ):2318 if format:2319 msg = "---------------------- >> begin tool %s << -----------------------\n" % stream2320 msg += output + "\n"2321 msg += "----------------------- >> end tool %s << ------------------------\n" % stream2322 else:2323 msg = output2324 return msg2325 def _get_job_stream_output( self, hda_id, stream, format ):2326 self.visit_url( "/datasets/%s/%s" % ( hda_id, stream ) )2327 output = self.last_page()2328 return self._format_stream( output, stream, format )2329 def __default_dataset_fetcher( self ):2330 def fetcher( hda_id, filename=None ):2331 if filename is None:2332 page_url = "/display?encoded_id=%s" % hda_id2333 else:2334 page_url = "/datasets/%s/display/%s" % ( hda_id, filename )2335 self.visit_url( page_url )2336 data = self.last_page()2337 return data...
random_visit.py
Source:random_visit.py
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3# @Date : 2019-7-28 11:57:024# @Author : Albert Shi5# @Link : http://blog.csdn.net/albertsh6# @Github : https://github.com/AlbertGithubHome7__author__ = 'AlbertS'8# @Subject : random visit url of list9import time10import random11import toolbox12import requests13import datetime14import collect_articlelist15def random_visit(count, url_list):16 try:17 for x in range(0,count):18 visit_url = random.choice(url_list)19 head = {'User-Agent': toolbox.get_random_user_agent(), 'Connection': 'keep-alive'}20 requests.get(visit_url, headers=head)21 print("curtime = {0}, visit_url={1}".format(22 datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), visit_url))23 time.sleep(random.uniform(3.14, 6.18))24 except:25 print("try error")26def proxy_visit():27 try:28 visit_url = 'https://blog.csdn.net/albertsh/article/details/73512880'29 head = {'User-Agent': toolbox.get_random_user_agent(), 'Connection': 'keep-alive'}30 proxy_ip_port = toolbox.get_random_proxy()31 proxy_data = {'http': proxy_ip_port, 'https': proxy_ip_port}32 print("curtime = {0}, proxy= {1}, visit_url={2}".format(33 datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), proxy_ip_port, visit_url))34 35 requests.get(visit_url, headers=head, proxies=proxy_data)36 print("visit success")37 except:38 print("try error")39if __name__ == '__main__':40 proxy_visit()41 #random_visit(10, collect_articlelist.get_aticle_list())...
check_power.py
Source:check_power.py
1from django.http import HttpResponse2from manager.models import power3def check_power(func):4 def inner_fun(request):5 visit_url = request.resolver_match.url_name6 # print(visit_url)7 power_info = power.objects.filter(url=visit_url).first()8 if power_info != None:9 user_power_list = request.session.get('user_power_list')10 # print(user_power_list)11 if visit_url not in user_power_list:12 return HttpResponse('æ¨æ²¡æ访é®è¯¥æ¹æ³çæé')13 result = func(request)14 return result...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!