Best Python code snippet using hypothesis
ga_handler.py
Source:ga_handler.py
1import sys2from copy import deepcopy3from ga_approach import *4from chromosome import Chromosome5from simulation import Simulation6class GAHandler:7 def __init__(self, problem):8 self.__original_stdout = sys.stdout9 self.problem = problem10 self.mutation_probability = 1011 self.crossover_probability = 8512 def representation_by_class(self, plain_chromosome):13 not_plain = []14 # Convert to class representation15 for id_job, id_task, id_machine in plain_chromosome:16 chunk = []17 job = self.problem.job(id_job)18 chunk.append(job)19 task = job.task(id_task)20 chunk.append(task)21 operation = task.operation(id_machine)22 chunk.append(operation)23 not_plain.append(chunk)24 return not_plain25 def create_initial_population(self, number_of_individuals, approach):26 class_population = []27 population = approach(number_of_individuals, self.problem.jobs)28 for individual in population:29 class_population.append(self.representation_by_class(individual))30 if not Chromosome.is_valid(self.representation_by_class(individual)):31 print("INVALIDITY AT STARTUP")32 return class_population33 def k_tournament(self, population, k):34 if len(population) < k and len(population) != 0:35 return population[0]36 parents = []37 for i in range(0, k):38 parent = None39 while True:40 parent = random.choice(population)41 if parent not in parents:42 break43 parents.append(parent)44 return min(parents, key=lambda x: Chromosome.compute_makespan(x, self.problem))45 def selection_and_crossover(self, population, k, elite_eval=False):46 new_population = []47 # Selection48 selected = GASelection.k_tournament(population, k, self.problem)49 # Crossover50 def pairwise(iterable):51 a = iter(iterable)52 return zip(a, a)53 for p1, p2 in pairwise(selected):54 new_population.extend(self.crossover([p1, p2]))55 if elite_eval:56 return GASelection.elite_evaluation(new_population, population, self.problem)57 return new_population58 def crossover(self, parents):59 children = []60 probability = random.randint(0, 100)61 if probability > self.crossover_probability:62 children.extend(parents)63 else:64 child = self.ipox_crossover(parents)65 children.append(child)66 if not Chromosome.is_valid(child):67 print("INVALIDITY AT crossover")68 child = self.ipox_crossover(parents, reverse=True)69 children.append(child)70 if not Chromosome.is_valid(child):71 print("INVALIDITY AT crossover")72 return children73 def ipox_crossover(self, parents, reverse=False):74 assert (len(parents) == 2)75 # For reverse76 if reverse:77 temp = parents[0]78 parents[0] = parents[1]79 parents[1] = temp80 # Consider first parent. Select one random job from it.81 job_selected = random.choice(self.problem.jobs)82 # From first parent, get position of related operations83 task_op_and_index = {}84 index = 085 for job, task, operation in parents[0]:86 if job.id_job == job_selected.id_job:87 task_op_and_index.update({index: [job, task, operation]})88 index += 189 # From second parent, get all operations except those beloging to job_selected90 child = []91 for job, task, operation in parents[1]:92 if job.id_job != job_selected.id_job:93 child.append([job, task, operation])94 # Then add the operation in specified index from parent95 for index, gene in task_op_and_index.items():96 child.insert(index, gene)97 return child98 def mutation(self, population, approach):99 new_population = []100 for chromosome in population:101 child = approach(self.problem, chromosome, self.mutation_probability)102 new_population.append(child)103 return new_population104 def best_makespan(self, population):105 best_makespan = 9999999999106 best_chromosome = None107 for chromosome in population:108 makespan = Chromosome.compute_makespan(chromosome, self.problem)109 if makespan < best_makespan:110 best_makespan = makespan111 best_chromosome = chromosome112 return best_makespan, best_chromosome113 def runBroad(self, population_size, generation_max, iterations, reachable_max=999999):114 import time115 t = time.time()116 # BROAD Approach117 sum_makespan = 0118 sum_generation = 0119 sum_initial = 0120 all_time_winner = None121 winner_makespan = 999999122 for iter in range(iterations):123 print("\n### BROAD APPROACH ### ITERATION: " + str(iter + 1) + "\n")124 initial_population = self.create_initial_population(population_size,125 GAInitializations.longest_shortest_init)126 generation_number = 0127 best_makespan, gen_winner = self.best_makespan(initial_population)128 # For calculating the average initial129 sum_initial += best_makespan130 best_generation = 1131 while generation_number < generation_max:132 intermediate_population = self.selection_and_crossover(initial_population, 3,133 elite_eval=True)134 new_population = self.mutation(intermediate_population, GAMutation.precedence_reserving_mutation)135 # new_population = intermediate_population136 makespan, supposed_winner = self.best_makespan(new_population)137 initial_population = new_population138 if makespan < best_makespan:139 best_makespan = makespan140 gen_winner = Chromosome.plain_representation(supposed_winner)141 best_generation = generation_number142 # Sum Up143 print("GENERATION | NUMBER", generation_number + 1, "BEST MAKESPAN", best_makespan)144 if best_makespan == reachable_max:145 break146 generation_number += 1147 sum_makespan += best_makespan148 sum_generation += best_generation149 print("ENDED. BEST MAKESPAN", best_makespan, "AT GEN", best_generation)150 print("BEST CHROMOSOME\n", gen_winner)151 if winner_makespan > best_makespan:152 winner_makespan = best_makespan153 all_time_winner = gen_winner154 # do stuff155 elapsed = time.time() - t156 print("\n\n#### SIMULATION ENDED BROAD ###\nValues:")157 print("Best Overall Makespan value:" + str(winner_makespan))158 print("Average Makespan value:" + str(sum_makespan / iterations))159 print("Average Generation value:" + str(sum_generation / iterations))160 print("Average Initial value:" + str(sum_initial / iterations))161 print("Total Computation time:" + str(elapsed) + "\n")162 all_time_winner = self.representation_by_class(all_time_winner)163 Simulation.run_for_drawing(self.problem, all_time_winner, filename="broad")164 Simulation.run_for_drawing_gantt(self.problem, all_time_winner)165 def runConditional(self, population_size, generation_max, iterations, reachable_max=999999):166 import time167 t = time.time()168 # BROAD Approach169 sum_makespan = 0170 sum_generation = 0171 sum_initial = 0172 all_time_winner = None173 winner_makespan = 999999174 for iter in range(iterations):175 print("\n### CONDITIONAL APPROACH ### ITERATION: " + str(iter + 1) + "\n")176 initial_population = self.create_initial_population(population_size,177 GAInitializations.permute_init)178 generation_number = 0179 best_makespan, gen_winner = self.best_makespan(initial_population)180 # For calculating the average initial181 sum_initial += best_makespan182 best_generation = 1183 while generation_number < generation_max:184 intermediate_population = self.selection_and_crossover(initial_population, 2)185 new_population = self.mutation(intermediate_population,186 GAMutation.conditional_precedence_reserving_mutation)187 # new_population = intermediate_population188 makespan, supposed_winner = self.best_makespan(new_population)189 initial_population = new_population190 if makespan < best_makespan:191 best_makespan = makespan192 gen_winner = Chromosome.plain_representation(supposed_winner)193 best_generation = generation_number194 # Sum Up195 print("GENERATION | NUMBER", generation_number + 1, "BEST MAKESPAN", best_makespan)196 if best_makespan == reachable_max:197 break198 generation_number += 1199 sum_makespan += best_makespan200 sum_generation += best_generation201 print("ENDED. BEST MAKESPAN", best_makespan, "AT GEN", best_generation)202 print("BEST CHROMOSOME\n", gen_winner)203 if winner_makespan > best_makespan:204 winner_makespan = best_makespan205 all_time_winner = gen_winner206 # do stuff207 elapsed = time.time() - t208 print("\n\n#### SIMULATION ENDED CONDITIONAL ###\nValues:")209 print("Best Overall Makespan value:" + str(winner_makespan))210 print("Average Makespan value:" + str(sum_makespan / iterations))211 print("Average Generation value:" + str(sum_generation / iterations))212 print("Average Initial value:" + str(sum_initial / iterations))213 print("Total Computation time:" + str(elapsed) + "\n")214 all_time_winner = self.representation_by_class(all_time_winner)215 Simulation.run_for_drawing(self.problem, all_time_winner, filename="conditional")216 Simulation.run_for_drawing_gantt(self.problem, all_time_winner)217 def runMixed01(self, population_size, generation_max, iterations, reachable_max=999999):218 import time219 t = time.time()220 # BROAD Approach221 sum_makespan = 0222 sum_generation = 0223 sum_initial = 0224 all_time_winner = None225 winner_makespan = 999999226 for iter in range(iterations):227 print("\n### MIXED APPROACH ### ITERATION: " + str(iter + 1) + "\n")228 initial_population = self.create_initial_population(population_size,229 GAInitializations.longest_shortest_init)230 generation_number = 0231 best_makespan, gen_winner = self.best_makespan(initial_population)232 # For calculating the average initial233 sum_initial += best_makespan234 best_generation = 1235 while generation_number < generation_max:236 intermediate_population = self.selection_and_crossover(initial_population, 3,237 elite_eval=True)238 new_population = self.mutation(intermediate_population, GAMutation.conditional_precedence_reserving_mutation)239 # new_population = intermediate_population240 makespan, supposed_winner = self.best_makespan(new_population)241 initial_population = new_population242 if makespan < best_makespan:243 best_makespan = makespan244 gen_winner = Chromosome.plain_representation(supposed_winner)245 best_generation = generation_number246 # Sum Up247 print("GENERATION | NUMBER", generation_number + 1, "BEST MAKESPAN", best_makespan)248 if best_makespan == reachable_max:249 break250 generation_number += 1251 sum_makespan += best_makespan252 sum_generation += best_generation253 print("ENDED. BEST MAKESPAN", best_makespan, "AT GEN", best_generation)254 print("BEST CHROMOSOME\n", gen_winner)255 if winner_makespan > best_makespan:256 winner_makespan = best_makespan257 all_time_winner = gen_winner258 # do stuff259 elapsed = time.time() - t260 print("\n\n#### SIMULATION ENDED MIXED ###\nValues:")261 print("Best Overall Makespan value:" + str(winner_makespan))262 print("Average Makespan value:" + str(sum_makespan / iterations))263 print("Average Generation value:" + str(sum_generation / iterations))264 print("Average Initial value:" + str(sum_initial / iterations))265 print("Total Computation time:" + str(elapsed) + "\n")266 all_time_winner = self.representation_by_class(all_time_winner)267 Simulation.run_for_drawing(self.problem, all_time_winner, filename="mixed")268 Simulation.run_for_drawing_gantt(self.problem, all_time_winner)269 def runMixed02(self, population_size, generation_max, iterations, reachable_max=999999):270 import time271 t = time.time()272 # BROAD Approach273 sum_makespan = 0274 sum_generation = 0275 sum_initial = 0276 all_time_winner = None277 winner_makespan = 999999278 for iter in range(iterations):279 print("\n### MIXED APPROACH ### ITERATION: " + str(iter + 1) + "\n")280 initial_population = self.create_initial_population(population_size,281 GAInitializations.permute_init)282 generation_number = 0283 best_makespan, gen_winner = self.best_makespan(initial_population)284 # For calculating the average initial285 sum_initial += best_makespan286 best_generation = 1287 while generation_number < generation_max:288 intermediate_population = self.selection_and_crossover(initial_population, 3,289 elite_eval=True)290 new_population = self.mutation(intermediate_population,291 GAMutation.precedence_reserving_mutation)292 # new_population = intermediate_population293 makespan, supposed_winner = self.best_makespan(new_population)294 initial_population = new_population295 if makespan < best_makespan:296 best_makespan = makespan297 gen_winner = Chromosome.plain_representation(supposed_winner)298 best_generation = generation_number299 # Sum Up300 print("GENERATION | NUMBER", generation_number + 1, "BEST MAKESPAN", best_makespan)301 if best_makespan == reachable_max:302 break303 generation_number += 1304 sum_makespan += best_makespan305 sum_generation += best_generation306 print("ENDED. BEST MAKESPAN", best_makespan, "AT GEN", best_generation)307 print("BEST CHROMOSOME\n", gen_winner)308 if winner_makespan > best_makespan:309 winner_makespan = best_makespan310 all_time_winner = gen_winner311 # do stuff312 elapsed = time.time() - t313 print("\n\n#### SIMULATION ENDED MIXED ###\nValues:")314 print("Best Overall Makespan value:" + str(winner_makespan))315 print("Average Makespan value:" + str(sum_makespan / iterations))316 print("Average Generation value:" + str(sum_generation / iterations))317 print("Average Initial value:" + str(sum_initial / iterations))318 print("Total Computation time:" + str(elapsed) + "\n")319 all_time_winner = self.representation_by_class(all_time_winner)320 Simulation.run_for_drawing(self.problem, all_time_winner, filename="mixed")...
test_generation_conditionals.py
Source:test_generation_conditionals.py
1# -*- coding: utf-8 -*-2# Copyright (c) 2013, Google, Inc.3# All rights reserved.4#5# Permission is hereby granted, free of charge, to any person obtaining a6# copy of this software and associated documentation files (the7# "Software"), to deal in the Software without restriction, including8# without limitation the rights to use, copy, modify, merge, publish, dis-9# tribute, sublicense, and/or sell copies of the Software, and to permit10# persons to whom the Software is furnished to do so, subject to the fol-11# lowing conditions:12#13# The above copyright notice and this permission notice shall be included14# in all copies or substantial portions of the Software.15#16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS17# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-18# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT19# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,20# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,21# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS22# IN THE SOFTWARE.23"""Integration tests for GS versioning support."""24import StringIO25import os26import tempfile27from xml import sax28from boto import handler29from boto.exception import GSResponseError30from boto.gs.acl import ACL31from tests.integration.gs.testcase import GSTestCase32# HTTP Error returned when a generation precondition fails.33VERSION_MISMATCH = "412"34class GSGenerationConditionalsTest(GSTestCase):35 def testConditionalSetContentsFromFile(self):36 b = self._MakeBucket()37 k = b.new_key("foo")38 s1 = "test1"39 fp = StringIO.StringIO(s1)40 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):41 k.set_contents_from_file(fp, if_generation=999)42 fp = StringIO.StringIO(s1)43 k.set_contents_from_file(fp, if_generation=0)44 g1 = k.generation45 s2 = "test2"46 fp = StringIO.StringIO(s2)47 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):48 k.set_contents_from_file(fp, if_generation=int(g1)+1)49 fp = StringIO.StringIO(s2)50 k.set_contents_from_file(fp, if_generation=g1)51 self.assertEqual(k.get_contents_as_string(), s2)52 def testConditionalSetContentsFromString(self):53 b = self._MakeBucket()54 k = b.new_key("foo")55 s1 = "test1"56 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):57 k.set_contents_from_string(s1, if_generation=999)58 k.set_contents_from_string(s1, if_generation=0)59 g1 = k.generation60 s2 = "test2"61 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):62 k.set_contents_from_string(s2, if_generation=int(g1)+1)63 k.set_contents_from_string(s2, if_generation=g1)64 self.assertEqual(k.get_contents_as_string(), s2)65 def testConditionalSetContentsFromFilename(self):66 s1 = "test1"67 s2 = "test2"68 f1 = tempfile.NamedTemporaryFile(prefix="boto-gs-test", delete=False)69 f2 = tempfile.NamedTemporaryFile(prefix="boto-gs-test", delete=False)70 fname1 = f1.name71 fname2 = f2.name72 f1.write(s1)73 f1.close()74 f2.write(s2)75 f2.close()76 try:77 b = self._MakeBucket()78 k = b.new_key("foo")79 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):80 k.set_contents_from_filename(fname1, if_generation=999)81 k.set_contents_from_filename(fname1, if_generation=0)82 g1 = k.generation83 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):84 k.set_contents_from_filename(fname2, if_generation=int(g1)+1)85 k.set_contents_from_filename(fname2, if_generation=g1)86 self.assertEqual(k.get_contents_as_string(), s2)87 finally:88 os.remove(fname1)89 os.remove(fname2)90 def testBucketConditionalSetAcl(self):91 b = self._MakeVersionedBucket()92 k = b.new_key("foo")93 s1 = "test1"94 k.set_contents_from_string(s1)95 g1 = k.generation96 mg1 = k.metageneration97 self.assertEqual(str(mg1), "1")98 b.set_acl("public-read", key_name="foo")99 k = b.get_key("foo")100 g2 = k.generation101 mg2 = k.metageneration102 self.assertEqual(g2, g1)103 self.assertGreater(mg2, mg1)104 with self.assertRaisesRegexp(ValueError, ("Received if_metageneration "105 "argument with no "106 "if_generation argument")):107 b.set_acl("bucket-owner-full-control", key_name="foo",108 if_metageneration=123)109 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):110 b.set_acl("bucket-owner-full-control", key_name="foo",111 if_generation=int(g2) + 1)112 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):113 b.set_acl("bucket-owner-full-control", key_name="foo",114 if_generation=g2, if_metageneration=int(mg2) + 1)115 b.set_acl("bucket-owner-full-control", key_name="foo", if_generation=g2)116 k = b.get_key("foo")117 g3 = k.generation118 mg3 = k.metageneration119 self.assertEqual(g3, g2)120 self.assertGreater(mg3, mg2)121 b.set_acl("public-read", key_name="foo", if_generation=g3,122 if_metageneration=mg3)123 def testConditionalSetContentsFromStream(self):124 b = self._MakeBucket()125 k = b.new_key("foo")126 s1 = "test1"127 fp = StringIO.StringIO(s1)128 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):129 k.set_contents_from_stream(fp, if_generation=999)130 fp = StringIO.StringIO(s1)131 k.set_contents_from_stream(fp, if_generation=0)132 g1 = k.generation133 k = b.get_key("foo")134 s2 = "test2"135 fp = StringIO.StringIO(s2)136 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):137 k.set_contents_from_stream(fp, if_generation=int(g1)+1)138 fp = StringIO.StringIO(s2)139 k.set_contents_from_stream(fp, if_generation=g1)140 self.assertEqual(k.get_contents_as_string(), s2)141 def testBucketConditionalSetCannedAcl(self):142 b = self._MakeVersionedBucket()143 k = b.new_key("foo")144 s1 = "test1"145 k.set_contents_from_string(s1)146 g1 = k.generation147 mg1 = k.metageneration148 self.assertEqual(str(mg1), "1")149 b.set_canned_acl("public-read", key_name="foo")150 k = b.get_key("foo")151 g2 = k.generation152 mg2 = k.metageneration153 self.assertEqual(g2, g1)154 self.assertGreater(mg2, mg1)155 with self.assertRaisesRegexp(ValueError, ("Received if_metageneration "156 "argument with no "157 "if_generation argument")):158 b.set_canned_acl("bucket-owner-full-control", key_name="foo",159 if_metageneration=123)160 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):161 b.set_canned_acl("bucket-owner-full-control", key_name="foo",162 if_generation=int(g2) + 1)163 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):164 b.set_canned_acl("bucket-owner-full-control", key_name="foo",165 if_generation=g2, if_metageneration=int(mg2) + 1)166 b.set_canned_acl("bucket-owner-full-control", key_name="foo",167 if_generation=g2)168 k = b.get_key("foo")169 g3 = k.generation170 mg3 = k.metageneration171 self.assertEqual(g3, g2)172 self.assertGreater(mg3, mg2)173 b.set_canned_acl("public-read", key_name="foo", if_generation=g3,174 if_metageneration=mg3)175 def testBucketConditionalSetXmlAcl(self):176 b = self._MakeVersionedBucket()177 k = b.new_key("foo")178 s1 = "test1"179 k.set_contents_from_string(s1)180 g1 = k.generation181 mg1 = k.metageneration182 self.assertEqual(str(mg1), "1")183 acl_xml = (184 '<ACCESSControlList><EntrIes><Entry>' +185 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +186 '</Entry></EntrIes></ACCESSControlList>')187 acl = ACL()188 h = handler.XmlHandler(acl, b)189 sax.parseString(acl_xml, h)190 acl = acl.to_xml()191 b.set_xml_acl(acl, key_name="foo")192 k = b.get_key("foo")193 g2 = k.generation194 mg2 = k.metageneration195 self.assertEqual(g2, g1)196 self.assertGreater(mg2, mg1)197 with self.assertRaisesRegexp(ValueError, ("Received if_metageneration "198 "argument with no "199 "if_generation argument")):200 b.set_xml_acl(acl, key_name="foo", if_metageneration=123)201 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):202 b.set_xml_acl(acl, key_name="foo", if_generation=int(g2) + 1)203 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):204 b.set_xml_acl(acl, key_name="foo", if_generation=g2,205 if_metageneration=int(mg2) + 1)206 b.set_xml_acl(acl, key_name="foo", if_generation=g2)207 k = b.get_key("foo")208 g3 = k.generation209 mg3 = k.metageneration210 self.assertEqual(g3, g2)211 self.assertGreater(mg3, mg2)212 b.set_xml_acl(acl, key_name="foo", if_generation=g3,213 if_metageneration=mg3)214 def testObjectConditionalSetAcl(self):215 b = self._MakeVersionedBucket()216 k = b.new_key("foo")217 k.set_contents_from_string("test1")218 g1 = k.generation219 mg1 = k.metageneration220 self.assertEqual(str(mg1), "1")221 k.set_acl("public-read")222 k = b.get_key("foo")223 g2 = k.generation224 mg2 = k.metageneration225 self.assertEqual(g2, g1)226 self.assertGreater(mg2, mg1)227 with self.assertRaisesRegexp(ValueError, ("Received if_metageneration "228 "argument with no "229 "if_generation argument")):230 k.set_acl("bucket-owner-full-control", if_metageneration=123)231 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):232 k.set_acl("bucket-owner-full-control", if_generation=int(g2) + 1)233 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):234 k.set_acl("bucket-owner-full-control", if_generation=g2,235 if_metageneration=int(mg2) + 1)236 k.set_acl("bucket-owner-full-control", if_generation=g2)237 k = b.get_key("foo")238 g3 = k.generation239 mg3 = k.metageneration240 self.assertEqual(g3, g2)241 self.assertGreater(mg3, mg2)242 k.set_acl("public-read", if_generation=g3, if_metageneration=mg3)243 def testObjectConditionalSetCannedAcl(self):244 b = self._MakeVersionedBucket()245 k = b.new_key("foo")246 k.set_contents_from_string("test1")247 g1 = k.generation248 mg1 = k.metageneration249 self.assertEqual(str(mg1), "1")250 k.set_canned_acl("public-read")251 k = b.get_key("foo")252 g2 = k.generation253 mg2 = k.metageneration254 self.assertEqual(g2, g1)255 self.assertGreater(mg2, mg1)256 with self.assertRaisesRegexp(ValueError, ("Received if_metageneration "257 "argument with no "258 "if_generation argument")):259 k.set_canned_acl("bucket-owner-full-control",260 if_metageneration=123)261 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):262 k.set_canned_acl("bucket-owner-full-control",263 if_generation=int(g2) + 1)264 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):265 k.set_canned_acl("bucket-owner-full-control", if_generation=g2,266 if_metageneration=int(mg2) + 1)267 k.set_canned_acl("bucket-owner-full-control", if_generation=g2)268 k = b.get_key("foo")269 g3 = k.generation270 mg3 = k.metageneration271 self.assertEqual(g3, g2)272 self.assertGreater(mg3, mg2)273 k.set_canned_acl("public-read", if_generation=g3, if_metageneration=mg3)274 def testObjectConditionalSetXmlAcl(self):275 b = self._MakeVersionedBucket()276 k = b.new_key("foo")277 s1 = "test1"278 k.set_contents_from_string(s1)279 g1 = k.generation280 mg1 = k.metageneration281 self.assertEqual(str(mg1), "1")282 acl_xml = (283 '<ACCESSControlList><EntrIes><Entry>' +284 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +285 '</Entry></EntrIes></ACCESSControlList>')286 acl = ACL()287 h = handler.XmlHandler(acl, b)288 sax.parseString(acl_xml, h)289 acl = acl.to_xml()290 k.set_xml_acl(acl)291 k = b.get_key("foo")292 g2 = k.generation293 mg2 = k.metageneration294 self.assertEqual(g2, g1)295 self.assertGreater(mg2, mg1)296 with self.assertRaisesRegexp(ValueError, ("Received if_metageneration "297 "argument with no "298 "if_generation argument")):299 k.set_xml_acl(acl, if_metageneration=123)300 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):301 k.set_xml_acl(acl, if_generation=int(g2) + 1)302 with self.assertRaisesRegexp(GSResponseError, VERSION_MISMATCH):303 k.set_xml_acl(acl, if_generation=g2, if_metageneration=int(mg2) + 1)304 k.set_xml_acl(acl, if_generation=g2)305 k = b.get_key("foo")306 g3 = k.generation307 mg3 = k.metageneration308 self.assertEqual(g3, g2)309 self.assertGreater(mg3, mg2)...
test_versioning.py
Source:test_versioning.py
1# -*- coding: utf-8 -*-2# Copyright (c) 2012, Google, Inc.3# All rights reserved.4#5# Permission is hereby granted, free of charge, to any person obtaining a6# copy of this software and associated documentation files (the7# "Software"), to deal in the Software without restriction, including8# without limitation the rights to use, copy, modify, merge, publish, dis-9# tribute, sublicense, and/or sell copies of the Software, and to permit10# persons to whom the Software is furnished to do so, subject to the fol-11# lowing conditions:12#13# The above copyright notice and this permission notice shall be included14# in all copies or substantial portions of the Software.15#16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS17# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-18# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT19# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,20# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,21# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS22# IN THE SOFTWARE.23"""Integration tests for GS versioning support."""24from xml import sax25from boto import handler26from boto.gs import acl27from tests.integration.gs.testcase import GSTestCase28class GSVersioningTest(GSTestCase):29 def testVersioningToggle(self):30 b = self._MakeBucket()31 self.assertFalse(b.get_versioning_status())32 b.configure_versioning(True)33 self.assertTrue(b.get_versioning_status())34 b.configure_versioning(False)35 self.assertFalse(b.get_versioning_status())36 def testDeleteVersionedKey(self):37 b = self._MakeVersionedBucket()38 k = b.new_key("foo")39 s1 = "test1"40 k.set_contents_from_string(s1)41 k = b.get_key("foo")42 g1 = k.generation43 s2 = "test2"44 k.set_contents_from_string(s2)45 k = b.get_key("foo")46 g2 = k.generation47 versions = list(b.list_versions())48 self.assertEqual(len(versions), 2)49 self.assertEqual(versions[0].name, "foo")50 self.assertEqual(versions[1].name, "foo")51 generations = [k.generation for k in versions]52 self.assertIn(g1, generations)53 self.assertIn(g2, generations)54 # Delete "current" version and make sure that version is no longer55 # visible from a basic GET call.56 b.delete_key("foo", generation=None)57 self.assertIsNone(b.get_key("foo"))58 # Both old versions should still be there when listed using the versions59 # query parameter.60 versions = list(b.list_versions())61 self.assertEqual(len(versions), 2)62 self.assertEqual(versions[0].name, "foo")63 self.assertEqual(versions[1].name, "foo")64 generations = [k.generation for k in versions]65 self.assertIn(g1, generations)66 self.assertIn(g2, generations)67 # Delete generation 2 and make sure it's gone.68 b.delete_key("foo", generation=g2)69 versions = list(b.list_versions())70 self.assertEqual(len(versions), 1)71 self.assertEqual(versions[0].name, "foo")72 self.assertEqual(versions[0].generation, g1)73 # Delete generation 1 and make sure it's gone.74 b.delete_key("foo", generation=g1)75 versions = list(b.list_versions())76 self.assertEqual(len(versions), 0)77 def testGetVersionedKey(self):78 b = self._MakeVersionedBucket()79 k = b.new_key("foo")80 s1 = "test1"81 k.set_contents_from_string(s1)82 k = b.get_key("foo")83 g1 = k.generation84 o1 = k.get_contents_as_string()85 self.assertEqual(o1, s1)86 s2 = "test2"87 k.set_contents_from_string(s2)88 k = b.get_key("foo")89 g2 = k.generation90 self.assertNotEqual(g2, g1)91 o2 = k.get_contents_as_string()92 self.assertEqual(o2, s2)93 k = b.get_key("foo", generation=g1)94 self.assertEqual(k.get_contents_as_string(), s1)95 k = b.get_key("foo", generation=g2)96 self.assertEqual(k.get_contents_as_string(), s2)97 def testVersionedBucketCannedAcl(self):98 b = self._MakeVersionedBucket()99 k = b.new_key("foo")100 s1 = "test1"101 k.set_contents_from_string(s1)102 k = b.get_key("foo")103 g1 = k.generation104 s2 = "test2"105 k.set_contents_from_string(s2)106 k = b.get_key("foo")107 g2 = k.generation108 acl1g1 = b.get_acl("foo", generation=g1)109 acl1g2 = b.get_acl("foo", generation=g2)110 owner1g1 = acl1g1.owner.id111 owner1g2 = acl1g2.owner.id112 self.assertEqual(owner1g1, owner1g2)113 entries1g1 = acl1g1.entries.entry_list114 entries1g2 = acl1g2.entries.entry_list115 self.assertEqual(len(entries1g1), len(entries1g2))116 b.set_acl("public-read", key_name="foo", generation=g1)117 acl2g1 = b.get_acl("foo", generation=g1)118 acl2g2 = b.get_acl("foo", generation=g2)119 entries2g1 = acl2g1.entries.entry_list120 entries2g2 = acl2g2.entries.entry_list121 self.assertEqual(len(entries2g2), len(entries1g2))122 public_read_entries1 = [e for e in entries2g1 if e.permission == "READ"123 and e.scope.type == acl.ALL_USERS]124 public_read_entries2 = [e for e in entries2g2 if e.permission == "READ"125 and e.scope.type == acl.ALL_USERS]126 self.assertEqual(len(public_read_entries1), 1)127 self.assertEqual(len(public_read_entries2), 0)128 def testVersionedBucketXmlAcl(self):129 b = self._MakeVersionedBucket()130 k = b.new_key("foo")131 s1 = "test1"132 k.set_contents_from_string(s1)133 k = b.get_key("foo")134 g1 = k.generation135 s2 = "test2"136 k.set_contents_from_string(s2)137 k = b.get_key("foo")138 g2 = k.generation139 acl1g1 = b.get_acl("foo", generation=g1)140 acl1g2 = b.get_acl("foo", generation=g2)141 owner1g1 = acl1g1.owner.id142 owner1g2 = acl1g2.owner.id143 self.assertEqual(owner1g1, owner1g2)144 entries1g1 = acl1g1.entries.entry_list145 entries1g2 = acl1g2.entries.entry_list146 self.assertEqual(len(entries1g1), len(entries1g2))147 acl_xml = (148 '<ACCESSControlList><EntrIes><Entry>' +149 '<Scope type="AllUsers"></Scope><Permission>READ</Permission>' +150 '</Entry></EntrIes></ACCESSControlList>')151 aclo = acl.ACL()152 h = handler.XmlHandler(aclo, b)153 sax.parseString(acl_xml, h)154 b.set_acl(aclo, key_name="foo", generation=g1)155 acl2g1 = b.get_acl("foo", generation=g1)156 acl2g2 = b.get_acl("foo", generation=g2)157 entries2g1 = acl2g1.entries.entry_list158 entries2g2 = acl2g2.entries.entry_list159 self.assertEqual(len(entries2g2), len(entries1g2))160 public_read_entries1 = [e for e in entries2g1 if e.permission == "READ"161 and e.scope.type == acl.ALL_USERS]162 public_read_entries2 = [e for e in entries2g2 if e.permission == "READ"163 and e.scope.type == acl.ALL_USERS]164 self.assertEqual(len(public_read_entries1), 1)165 self.assertEqual(len(public_read_entries2), 0)166 def testVersionedObjectCannedAcl(self):167 b = self._MakeVersionedBucket()168 k = b.new_key("foo")169 s1 = "test1"170 k.set_contents_from_string(s1)171 k = b.get_key("foo")172 g1 = k.generation173 s2 = "test2"174 k.set_contents_from_string(s2)175 k = b.get_key("foo")176 g2 = k.generation177 acl1g1 = b.get_acl("foo", generation=g1)178 acl1g2 = b.get_acl("foo", generation=g2)179 owner1g1 = acl1g1.owner.id180 owner1g2 = acl1g2.owner.id181 self.assertEqual(owner1g1, owner1g2)182 entries1g1 = acl1g1.entries.entry_list183 entries1g2 = acl1g2.entries.entry_list184 self.assertEqual(len(entries1g1), len(entries1g2))185 b.set_acl("public-read", key_name="foo", generation=g1)186 acl2g1 = b.get_acl("foo", generation=g1)187 acl2g2 = b.get_acl("foo", generation=g2)188 entries2g1 = acl2g1.entries.entry_list189 entries2g2 = acl2g2.entries.entry_list190 self.assertEqual(len(entries2g2), len(entries1g2))191 public_read_entries1 = [e for e in entries2g1 if e.permission == "READ"192 and e.scope.type == acl.ALL_USERS]193 public_read_entries2 = [e for e in entries2g2 if e.permission == "READ"194 and e.scope.type == acl.ALL_USERS]195 self.assertEqual(len(public_read_entries1), 1)196 self.assertEqual(len(public_read_entries2), 0)197 def testCopyVersionedKey(self):198 b = self._MakeVersionedBucket()199 k = b.new_key("foo")200 s1 = "test1"201 k.set_contents_from_string(s1)202 k = b.get_key("foo")203 g1 = k.generation204 s2 = "test2"205 k.set_contents_from_string(s2)206 b2 = self._MakeVersionedBucket()207 b2.copy_key("foo2", b.name, "foo", src_generation=g1)208 k2 = b2.get_key("foo2")209 s3 = k2.get_contents_as_string()210 self.assertEqual(s3, s1)211 def testKeyGenerationUpdatesOnSet(self):212 b = self._MakeVersionedBucket()213 k = b.new_key("foo")214 self.assertIsNone(k.generation)215 k.set_contents_from_string("test1")216 g1 = k.generation217 self.assertRegexpMatches(g1, r'[0-9]+')218 self.assertEqual(k.metageneration, '1')219 k.set_contents_from_string("test2")220 g2 = k.generation221 self.assertNotEqual(g1, g2)222 self.assertRegexpMatches(g2, r'[0-9]+')223 self.assertGreater(int(g2), int(g1))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!