How to use parallelize method in localstack

Best Python code snippet using localstack_python

RDDOpSample.py

Source:RDDOpSample.py Github

copy

Full Screen

...32 for v in values:33 print("Value Side Effect: %s" % v)34class RDDOpSample():35 def doCollect(self, sc):36 rdd = sc.parallelize(range(1, 11))37 result = rdd.collect()38 print(result)39 def doCount(self, sc):40 rdd = sc.parallelize(range(1, 11))41 result = rdd.count()42 print(result)43 def doMap(self, sc):44 rdd1 = sc.parallelize(range(1, 6))45 rdd2 = rdd1.map(lambda v: v + 1)46 print(rdd2.collect())47 def doFlatMap(self, sc):48 rdd1 = sc.parallelize(["apple,orange", "grape,apple,mango", "blueberry,tomato,orange"])49 rdd2 = rdd1.flatMap(lambda s: s.split(","))50 print(rdd2.collect())51 def doMapPartitions(self, sc):52 rdd1 = sc.parallelize(range(1, 11), 3)53 rdd2 = rdd1.mapPartitions(increase)54 print(rdd2.collect())55 def doMapPartitionsWithIndex(self, sc):56 rdd1 = sc.parallelize(range(1, 11), 3)57 rdd2 = rdd1.mapPartitionsWithIndex(increaseWithIndex)58 print(rdd2.collect())59 def doMapValues(self, sc):60 rdd1 = sc.parallelize(["a", "b", "c"])61 rdd2 = rdd1.map(lambda v: (v, 1))62 rdd3 = rdd2.mapValues(lambda i: i + 1)63 print(rdd3.collect())64 def doFlatMapValues(self, sc):65 rdd1 = sc.parallelize([(1, "a,b"), (2, "a,c"), (1, "d,e")])66 rdd2 = rdd1.flatMapValues(lambda s: s.split(","))67 print(rdd2.collect())68 def doZip(self, sc):69 rdd1 = sc.parallelize(["a", "b", "c"])70 rdd2 = sc.parallelize([1, 2, 3])71 result = rdd1.zip(rdd2)72 print(result.collect())73 def doGroupBy(self, sc):74 rdd1 = sc.parallelize(range(1, 11))75 rdd2 = rdd1.groupBy(lambda v: "even" if v % 2 == 0 else "odd")76 for x in rdd2.collect():77 print(x[0], list(x[1]))78 def doGroupByKey(self, sc):79 rdd1 = sc.parallelize(["a", "b", "c", "b", "c"]).map(lambda v: (v, 1))80 rdd2 = rdd1.groupByKey()81 for x in rdd2.collect():82 print(x[0], list(x[1]))83 def doCogroup(self, sc):84 rdd1 = sc.parallelize([("k1", "v1"), ("k2", "v2"), ("k1", "v3")])85 rdd2 = sc.parallelize([("k1", "v4")])86 result = rdd1.cogroup(rdd2)87 for x in result.collect():88 print(x[0], list(x[1][0]), list(x[1][1]))89 def doDistinct(self, sc):90 rdd = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 3])91 result = rdd.distinct()92 print(result.collect())93 def doCartesian(self, sc):94 rdd1 = sc.parallelize([1, 2, 3])95 rdd2 = sc.parallelize(["a", "b", "c"])96 result = rdd1.cartesian(rdd2)97 print(result.collect())98 def doSubtract(self, sc):99 rdd1 = sc.parallelize(["a", "b", "c", "d", "e"])100 rdd2 = sc.parallelize(["d", "e"])101 result = rdd1.subtract(rdd2)102 print(result.collect())103 def doUnion(self, sc):104 rdd1 = sc.parallelize(["a", "b", "c"])105 rdd2 = sc.parallelize(["d", "e", "f"])106 result = rdd1.union(rdd2)107 print(result.collect())108 def doIntersection(self, sc):109 rdd1 = sc.parallelize(["a", "a", "b", "c"])110 rdd2 = sc.parallelize(["a", "a", "c", "c"])111 result = rdd1.intersection(rdd2)112 print(result.collect())113 def doJoin(self, sc):114 rdd1 = sc.parallelize(["a", "b", "c", "d", "e"]).map(lambda v: (v, 1))115 rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2))116 result = rdd1.join(rdd2)117 print(result.collect())118 def doLeftOuterJoin(self, sc):119 rdd1 = sc.parallelize(["a", "b", "c"]).map(lambda v: (v, 1))120 rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2))121 result1 = rdd1.leftOuterJoin(rdd2)122 result2 = rdd1.rightOuterJoin(rdd2)123 print("Left: %s" % result1.collect())124 print("Right: %s" % result2.collect())125 def doSubtractByKey(self, sc):126 rdd1 = sc.parallelize(["a", "b"]).map(lambda v: (v, 1))127 rdd2 = sc.parallelize(["b"]).map(lambda v: (v, 1))128 result = rdd1.subtractByKey(rdd2)129 print(result.collect())130 def doReduceByKey(self, sc):131 rdd = sc.parallelize(["a", "b", "b"]).map(lambda v: (v, 1))132 result = rdd.reduceByKey(lambda v1, v2: v1 + v2)133 print(result.collect())134 def doFoldByKey(self, sc):135 rdd = sc.parallelize(["a", "b", "b"]).map(lambda v: (v, 1))136 result = rdd.foldByKey(0, lambda v1, v2: v1 + v2)137 print(result.collect())138 def doCombineByKey(self, sc):139 rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])140 result = rdd.combineByKey(lambda v: createCombiner(v), lambda c, v: mergeValue(c, v),141 lambda c1, c2: mergeCombiners(c1, c2))142 print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])143 def doAggregateByKey(self, sc):144 rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])145 result = rdd.aggregateByKey(Record(0, 0), lambda c, v: mergeValue(c, v), lambda c1, c2: mergeCombiners(c1, c2))146 print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])147 def doPipe(self, sc):148 rdd = sc.parallelize(["1,2,3", "4,5,6", "7,8,9"])149 result = rdd.pipe("cut -f 1,3 -d ,")150 print(result.collect())151 def doCoalesceAndRepartition(self, sc):152 rdd1 = sc.parallelize(list(range(1, 11)), 10)153 rdd2 = rdd1.coalesce(5)154 rdd3 = rdd2.repartition(10)155 print("partition size: %d" % rdd1.getNumPartitions())156 print("partition size: %d" % rdd2.getNumPartitions())157 print("partition size: %d" % rdd3.getNumPartitions())158 def doRepartitionAndSortWithinPartitions(self, sc):159 data = [random.randrange(1, 100) for i in range(0, 10)]160 rdd1 = sc.parallelize(data).map(lambda v: (v, "-"))161 rdd2 = rdd1.repartitionAndSortWithinPartitions(3, lambda x: x)162 rdd2.foreachPartition(lambda values: print(list(values)))163 def doPartitionBy(self, sc):164 rdd1 = sc.parallelize([("apple", 1), ("mouse", 1), ("monitor", 1)], 5)165 rdd2 = rdd1.partitionBy(3)166 print("rdd1: %d, rdd2: %d" % (rdd1.getNumPartitions(), rdd2.getNumPartitions()))167 def doFilter(self, sc):168 rdd1 = sc.parallelize(range(1, 6))169 rdd2 = rdd1.filter(lambda i: i > 2)170 print(rdd2.collect())171 def doSortByKey(self, sc):172 rdd = sc.parallelize([("q", 1), ("z", 1), ("a", 1)])173 result = rdd.sortByKey()174 print(result.collect())175 def doKeysAndValues(self, sc):176 rdd = sc.parallelize([("k1", "v1"), ("k2", "v2"), ("k3", "v3")])177 print(rdd.keys().collect())178 print(rdd.values().collect())179 def doSample(self, sc):180 rdd = sc.parallelize(range(1, 101))181 result1 = rdd.sample(False, 0.5, 100)182 result2 = rdd.sample(True, 1.5, 100)183 print(result1.take(5))184 print(result2.take(5))185 def doFirst(self, sc):186 rdd = sc.parallelize([5, 4, 1])187 result = rdd.first()188 print(result)189 def doTake(self, sc):190 rdd = sc.parallelize(range(1, 100))191 result = rdd.take(5)192 print(result)193 def doTakeSample(self, sc):194 rdd = sc.parallelize(range(1, 100))195 result = rdd.takeSample(False, 20)196 print(len(result))197 def doCountByValue(self, sc):198 rdd = sc.parallelize([1, 1, 2, 3, 3])199 result = rdd.countByValue()200 for k, v in result.items():201 print(k, "->", v)202 def doReduce(self, sc):203 rdd = sc.parallelize(range(1, 11), 3)204 result = rdd.reduce(lambda v1, v2: v1 + v2)205 print(result)206 def doFold(self, sc):207 rdd = sc.parallelize(range(1, 11), 3)208 result = rdd.fold(0, lambda v1, v2: v1 + v2)209 print(result)210 def doAggregate(self, sc):211 rdd = sc.parallelize([100, 80, 75, 90, 95])212 result = rdd.aggregate(Record(0, 0), seqOp, combOp)213 print(result)214 def doSum(self, sc):215 rdd = sc.parallelize(range(1, 11))216 result = rdd.sum()217 print(result)218 def doForeach(self, sc):219 rdd = sc.parallelize(range(1, 11))220 result = rdd.foreach(lambda v: print("Value Side Effect: %s" % v))221 def doForeachPartition(self, sc):222 rdd = sc.parallelize(range(1, 11), 3)223 result = rdd.foreachPartition(sideEffect)224 def doDebugString(self, sc):225 rdd1 = sc.parallelize(range(1, 100), 10)226 rdd2 = rdd1.map(lambda v: v * 2)227 rdd3 = rdd2.map(lambda v: v + 1)228 rdd4 = rdd3.coalesce(2)229 print(rdd4.toDebugString())230 def doCache(self, sc):231 rdd = sc.parallelize(range(1, 100), 10)232 rdd.cache()233 rdd.persist(StorageLevel.MEMORY_ONLY)234 print(rdd.persist().is_cached)235 def doGetPartitions(self, sc):236 rdd = sc.parallelize(range(1, 100), 10)237 print(rdd.getNumPartitions())238 def saveAndLoadTextFile(self, sc):239 rdd = sc.parallelize(range(1, 1000), 3)240 codec = "org.apache.hadoop.io.compress.GzipCodec"241 # save242 rdd.saveAsTextFile("<path_to_save>/sub1")243 # save(gzip)244 rdd.saveAsTextFile("<path_to_save>/sub2", codec)245 # load246 rdd2 = sc.textFile("<path_to_save>/sub1")247 print(rdd2.take(10))248 def saveAndLoadObjectFile(self, sc):249 rdd = sc.parallelize(range(1, 1000), 3)250 # save251 # 아래 경로는 실제 저장 경로로 변경하여 테스트252 rdd.saveAsPickleFile("data/sample/saveAsObjectFile/python")253 # load254 # 아래 경로는 실제 저장 경로로 변경하여 테스트255 rdd2 = sc.pickleFile("data/sample/saveAsObjectFile/python")256 print(rdd2.take(10))257 def saveAndLoadSequenceFile(self, sc):258 # 아래 경로는 실제 저장 경로로 변경하여 테스트259 path = "data/sample/saveAsSeqFile/python"260 outputFormatClass = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"261 inputFormatClass = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"262 keyClass = "org.apache.hadoop.io.Text"263 valueClass = "org.apache.hadoop.io.IntWritable"264 conf = "org.apache.hadoop.conf.Configuration"265 rdd1 = sc.parallelize(["a", "b", "c", "b", "c"])266 rdd2 = rdd1.map(lambda x: (x, 1))267 # save268 rdd2.saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass, valueClass)269 # load270 rdd3 = sc.newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass)271 for k, v in rdd3.collect():272 print(k, v)273 def testBroadcaset(self, sc):274 bu = sc.broadcast(set(["u1", "u2"]))275 rdd = sc.parallelize(["u1", "u3", "u3", "u4", "u5", "u6"], 3)276 result = rdd.filter(lambda v: v in bu.value)277 print(result.collect())278if __name__ == "__main__":279 conf = SparkConf()280 conf.set("spark.driver.host", "127.0.0.1")281 sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)282 obj = RDDOpSample()283 # [예제 실행 방법] 아래에서 원하는 예제의 주석을 제거하고 실행!!284 # ex) obj.testBroadcaset(sc)285 # obj.doCollect(sc)286 # obj.doCount(sc)287 # obj.doMap(sc)288 # obj.doFlatMap(sc)289 # obj.doMapPartitions(sc)...

Full Screen

Full Screen

parallelize_repy.py

Source:parallelize_repy.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2### Automatically generated by repyhelper.py ### C:\Dropbox\uni\y1p2\dist\lab\demokit\parallelize.repy3### THIS FILE WILL BE OVERWRITTEN!4### DO NOT MAKE CHANGES HERE, INSTEAD EDIT THE ORIGINAL SOURCE FILE5###6### If changes to the src aren't propagating here, try manually deleting this file. 7### Deleting this file forces regeneration of a repy translation8from repyportability import *9import repyhelper10mycontext = repyhelper.get_shared_context()11callfunc = 'import'12callargs = []13""" 14Author: Justin Cappos15Module: A parallelization module. It performs actions in parallel to make it16 easy for a user to call a function with a list of tasks.17Start date: November 11th, 200818This module is adapted from code in seash which had similar functionality.19NOTE (for the programmer using this module). It's really important to 20write concurrency safe code for the functions they provide us. It will not 21work to write:22def foo(...):23 mycontext['count'] = mycontext['count'] + 124YOU MUST PUT A LOCK AROUND SUCH ACCESSES.25"""26# I use this to get unique identifiers. 27repyhelper.translate_and_import('uniqueid.repy')28class ParallelizeError(Exception):29 """An error occurred when operating on a parallelized task"""30# This has information about all of the different parallel functions.31# The keys are unique integers and the entries look like this:32# {'abort':False, 'callfunc':callfunc, 'callargs':callargs,33# 'targetlist':targetlist, 'availabletargetpositions':positionlist,34# 'runninglist':runninglist, 'result':result}35#36# abort is used to determine if future events should be aborted.37# callfunc is the function to call38# callargs are extra arguments to pass to the function39# targetlist is the list of items to call the function with40# runninglist is used to track which events are executing41# result is a dictionary that contains information about completed function.42# The format of result is:43# {'exception':list of tuples with (target, exception string), 44# 'aborted':list of targets,45# 'returned':list of tuples with (target, return value)}46# 47parallelize_info_dict = {}48def parallelize_closefunction(parallelizehandle):49 """50 <Purpose>51 Clean up the state created after calling parallelize_initfunction.52 <Arguments>53 parallelizehandle:54 The handle returned by parallelize_initfunction55 56 <Exceptions>57 None58 <Side Effects>59 Will try to abort future functions if possible60 <Returns>61 True if the parallelizehandle was recognized or False if the handle is62 invalid or already closed.63 """64 # There is no sense trying to check then delete, since there may be a race 65 # with multiple calls to this function.66 try:67 del parallelize_info_dict[parallelizehandle]68 except KeyError:69 return False70 else:71 return True72 73def parallelize_abortfunction(parallelizehandle):74 """75 <Purpose>76 Cause pending events for a function to abort. Events will finish 77 processing their current event.78 <Arguments>79 parallelizehandle:80 The handle returned by parallelize_initfunction81 82 <Exceptions>83 ParallelizeError is raised if the handle is unrecognized84 <Side Effects>85 None86 <Returns>87 True if the function was not previously aborting and is now, or False if 88 the function was already set to abort before the call.89 """90 91 try:92 if parallelize_info_dict[parallelizehandle]['abort'] == False:93 parallelize_info_dict[parallelizehandle]['abort'] = True94 return True95 else:96 return False97 except KeyError:98 raise ParallelizeError("Cannot abort the parallel execution of a non-existent handle:"+str(parallelizehandle))99def parallelize_isfunctionfinished(parallelizehandle):100 """101 <Purpose>102 Indicate if a function is finished103 <Arguments>104 parallelizehandle:105 The handle returned by parallelize_initfunction106 107 <Exceptions>108 ParallelizeError is raised if the handle is unrecognized109 <Side Effects>110 None111 <Returns>112 True if the function has finished, False if it is still has events running113 """114 115 try:116 if parallelize_info_dict[parallelizehandle]['runninglist']:117 return False118 else:119 return True120 except KeyError:121 raise ParallelizeError("Cannot get status for the parallel execution of a non-existent handle:"+str(parallelizehandle))122def parallelize_getresults(parallelizehandle):123 """124 <Purpose>125 Get information about a parallelized function126 <Arguments>127 parallelizehandle:128 The handle returned by parallelize_initfunction129 130 <Exceptions>131 ParallelizeError is raised if the handle is unrecognized132 <Side Effects>133 None134 <Returns>135 A dictionary with the results. The format is136 {'exception':list of tuples with (target, exception string), 137 'aborted':list of targets, 'returned':list of tuples with (target, 138 return value)}139 """140 141 try:142 # I copy so that the user doesn't have to deal with the fact I may still143 # be modifying it144 return parallelize_info_dict[parallelizehandle]['result'].copy()145 except KeyError:146 raise ParallelizeError("Cannot get results for the parallel execution of a non-existent handle:"+str(parallelizehandle))147 148 149def parallelize_initfunction(targetlist, callerfunc,concurrentevents=5, *extrafuncargs):150 """151 <Purpose>152 Call a function with each argument in a list in parallel153 <Arguments>154 targetlist:155 The list of arguments the function should be called with. Each156 argument is passed once to the function. Items may appear in the157 list multiple times158 callerfunc:159 The function to call160 161 concurrentevents:162 The number of events to issue concurrently (default 5). No more 163 than len(targetlist) events will be concurrently started.164 extrafuncargs:165 Extra arguments the function should be called with (every function166 is passed the same extra args).167 <Exceptions>168 ParallelizeError is raised if there isn't at least one free event. 169 However, if there aren't at least concurrentevents number of free events,170 this is not an error (instead this is reflected in parallelize_getstatus)171 in the status information.172 <Side Effects>173 Starts events, etc.174 <Returns>175 A handle used for status information, etc.176 """177 parallelizehandle = uniqueid_getid()178 # set up the dict locally one line at a time to avoid a ginormous line179 handleinfo = {}180 handleinfo['abort'] = False181 handleinfo['callfunc'] = callerfunc182 handleinfo['callargs'] = extrafuncargs183 # make a copy of target list because 184 handleinfo['targetlist'] = targetlist[:]185 handleinfo['availabletargetpositions'] = range(len(handleinfo['targetlist']))186 handleinfo['result'] = {'exception':[],'returned':[],'aborted':[]}187 handleinfo['runninglist'] = []188 189 parallelize_info_dict[parallelizehandle] = handleinfo190 # don't start more threads than there are targets (duh!)191 threads_to_start = min(concurrentevents, len(handleinfo['targetlist']))192 for workercount in range(threads_to_start):193 # we need to append the workercount here because we can't return until 194 # this is scheduled without having race conditions195 parallelize_info_dict[parallelizehandle]['runninglist'].append(workercount)196 try:197 settimer(0.0, parallelize_execute_function, (parallelizehandle,workercount))198 except:199 # If I'm out of resources, stop200 # remove this worker (they didn't start)201 parallelize_info_dict[parallelizehandle]['runninglist'].remove(workercount)202 if not parallelize_info_dict[parallelizehandle]['runninglist']:203 parallelize_closefunction(parallelizehandle)204 raise Exception, "No events available!"205 break206 207 return parallelizehandle208 209def parallelize_execute_function(handle, myid):210 # This is internal only. It's used to execute the user function...211 # No matter what, an exception in me should not propagate up! Otherwise,212 # we might result in the program's termination!213 try:214 while True:215 # separate this from below functionality to minimize scope of try block216 thetargetlist = parallelize_info_dict[handle]['targetlist']217 try:218 mytarget = thetargetlist.pop()219 except IndexError:220 # all items are gone, let's return221 return222 # if they want us to abort, put this in the aborted list223 if parallelize_info_dict[handle]['abort']:224 parallelize_info_dict[handle]['result']['aborted'].append(mytarget)225 else:226 # otherwise process this normally227 # limit the scope of the below try block...228 callfunc = parallelize_info_dict[handle]['callfunc']229 callargs = parallelize_info_dict[handle]['callargs']230 try:231 retvalue = callfunc(mytarget,*callargs)232 except Exception, e:233 # always log on error. We need to report what happened234 parallelize_info_dict[handle]['result']['exception'].append((mytarget,str(e)))235 else:236 # success, add it to the dict...237 parallelize_info_dict[handle]['result']['returned'].append((mytarget,retvalue))238 except KeyError:239 # A KeyError is normal if they've closed the handle240 return241 except Exception, e:242 print 'Internal Error: Exception in parallelize_execute_function',e243 finally:244 # remove my entry from the list of running worker threads...245 try:246 parallelize_info_dict[handle]['runninglist'].remove(myid)247 except (ValueError, KeyError):248 pass249 250 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful