Best Testkube code snippet using workerpool.execute
pool.go
Source:pool.go
...6type reqtype int87const (8 shutdown reqtype = 09 startup reqtype = 110 execute reqtype = 211 redrive reqtype = 312 wrap reqtype = 413 dosingle reqtype = 514 stopped uint32 = 015 starting uint32 = 116 running uint32 = 217 stopping uint32 = 418)19type poolreq interface {20 Type() reqtype21}22type asyncreq struct {23 Error error24 done chan interface{}25}26func (req *asyncreq) Wait() {27 for range req.done {28 }29}30func (req *asyncreq) Done() {31 defer func() {32 if r := recover(); r != nil {33 }34 }()35 close(req.done)36}37type startupReq struct {38 asyncreq39}40func (req *startupReq) Type() reqtype {41 return startup42}43type shutdownReq bool44func (req shutdownReq) Type() reqtype {45 return shutdown46}47type executeReq struct {48 asyncreq49 Operations []Operation50 Output chan<- Operation51}52func (req *executeReq) Type() reqtype {53 return execute54}55type doSingleReq struct {56 asyncreq57 Input Operation58}59func (doSingleReq) Type() reqtype {60 return dosingle61}62type wrapStreamReq struct {63 asyncreq64 Input <-chan Operation65 Output chan Operation66}67func (req *wrapStreamReq) Type() reqtype {68 return wrap69}70type redriveReq struct {71 asyncreq72 PreviousAssignee Worker73 Operation Operation74}75func (req *redriveReq) Type() reqtype {76 return redrive77}78// WorkerPool - represents a pool of (possibly heterogeneous) Workers who will read79// messages off of a queue and process them. The idea here is that the messages80// without curation and dispatch may go through several passes before being handled81// by the correct worker.82//83// What the worker pool does, then, is to provide basic controls around spinning up84// the worker, assigning tasks to the worker, keeping the workers up, and shutting85// down the pool when everything is done.86//87// It exposes just three methods: Start, Execute, Shutdown. With these three methods88// we should be able to push tasks to initialise the workers, push tasks to them, and89// reclaim resources when done.90type WorkerPool struct {91 Workers []Worker92 operationChan chan Operation93 reqChan chan poolreq94 state uint3295 execWg *sync.WaitGroup96 closeOnce *sync.Once97 shutdownOnce *sync.Once98}99func (pool *WorkerPool) restore(worker Worker) {100 // TODO: do other restore-y things101 pool.runWorkerAsListener(worker)102}103func (pool *WorkerPool) retryOperation(op Operation, previousAssignee Worker) {104 req := redriveReq{105 Operation: op,106 PreviousAssignee: previousAssignee,107 }108 pool.reqChan <- &req109}110func (pool *WorkerPool) assign(worker Worker, op Operation) {111 retry, err := worker.Process(op)112 if err == nil {113 op.Done()114 return115 }116 if retry {117 op.IncrementTry()118 go pool.retryOperation(op, worker)119 } else {120 worker.HandleError(err, op)121 op.Done()122 }123}124func (pool *WorkerPool) runWorkerAsListener(worker Worker) {125 // In case there is a panic, let's restart the worker126 // but otherwise, just clean up the worker, however it127 // knows how128 var currentOp Operation129 defer func() {130 if currentOp != nil {131 currentOp.Done()132 }133 worker.Cleanup()134 if r := recover(); r != nil {135 pool.restore(worker)136 }137 }()138 // process each operation received operation by assigning139 // to this worker140 for currentOp = range pool.operationChan {141 pool.assign(worker, currentOp)142 }143}144func (pool *WorkerPool) initWorker(worker Worker) error {145 initErr := worker.Init()146 if initErr == nil {147 return nil148 }149 // if there is any initialisation errors,150 // we should shutdown the pool immediately so it can't151 // be used152 go pool.Shutdown()153 return errors.Wrap(initErr, "unable to initialise worker")154}155func (pool *WorkerPool) start() error {156 // if the pool is already running, don't do anything157 if pool.state == running || pool.state == starting {158 return nil159 }160 pool.state = starting161 // create brand new operations channel and close-once sema162 pool.operationChan = make(chan Operation)163 pool.closeOnce = &sync.Once{}164 pool.execWg = &sync.WaitGroup{}165 for _, worker := range pool.Workers {166 if initErr := pool.initWorker(worker); initErr != nil {167 return initErr168 }169 go pool.runWorkerAsListener(worker)170 }171 return nil172}173func waitTillCompleted(wg *sync.WaitGroup, op Operation, output chan<- Operation) {174 defer func() {175 if recover() != nil {176 op.Done()177 output <- op178 wg.Done()179 }180 }()181 op.Wait()182 output <- op183 wg.Done()184}185func (pool *WorkerPool) enqueue(ops []Operation, output chan<- Operation) error {186 if pool.state != running {187 close(output)188 return errors.New("error: enqueuing messages on non-running actor pool")189 }190 wg := sync.WaitGroup{}191 for _, op := range ops {192 pool.operationChan <- op193 wg.Add(1)194 go waitTillCompleted(&wg, op, output)195 }196 go func() {197 wg.Wait()198 close(output)199 }()200 return nil201}202func (pool *WorkerPool) do(op Operation) error {203 if pool.state != running {204 return errors.New("error: enqueuing message on non-running actor pool")205 }206 pool.operationChan <- op207 return nil208}209func (pool *WorkerPool) shutdown() {210 // if the pool is already shutdown, just return211 if pool.state == stopped {212 return213 }214 pool.state = stopping215 // otherwise, close the operation channel in a once216 pool.closeOnce.Do(func() {217 close(pool.operationChan)218 })219}220func (pool *WorkerPool) redrive(op Operation, previousWorker Worker) {221 if pool.state != running {222 pool.assign(previousWorker, op)223 }224 pool.operationChan <- op225}226func (pool *WorkerPool) listenToRequests() {227 for req := range pool.reqChan {228 switch v := req.(type) {229 case *startupReq:230 v.Error = pool.start()231 v.Done()232 if v.Error == nil {233 pool.state = running234 }235 case *executeReq:236 pool.execWg.Add(1)237 go func() {238 v.Error = pool.enqueue(v.Operations, v.Output)239 v.Done()240 pool.execWg.Done()241 }()242 case *doSingleReq:243 pool.execWg.Add(1)244 go func() {245 v.Error = pool.do(v.Input)246 v.Done()247 pool.execWg.Done()248 }()249 case *redriveReq:250 pool.redrive(v.Operation, v.PreviousAssignee)251 case *wrapStreamReq:252 pool.execWg.Add(1)253 go func() {254 outStream, err := pool.wrapStream(v.Input, pool.execWg)255 v.Error = err256 v.Output = outStream257 v.Done()258 }()259 case shutdownReq:260 go func(pool *WorkerPool) {261 pool.shutdownOnce.Do(func() {262 // wait till all the previous executes have completed263 pool.execWg.Wait()264 // then stop everything265 pool.shutdown()266 pool.state = stopped267 })268 }(pool)269 }270 }271}272// NewPool - creates a new pool of workers. By passing it a list of273// workers, each will be initialised, and registered to receive messages274// on a queue, and restarted when some error occurs. Each will be shutdown275// appropriately when the shutdown sequence is called.276func NewPool(workers []Worker) *WorkerPool {277 pool := WorkerPool{278 Workers: workers,279 reqChan: make(chan poolreq),280 state: stopped,281 execWg: &sync.WaitGroup{},282 shutdownOnce: &sync.Once{},283 }284 go pool.listenToRequests()285 return &pool286}287// Start - start the pool by setting up the workers to listen to the288// requests. A pool that isn't started cannot process any requests.289func (pool *WorkerPool) Start() error {290 req := &startupReq{291 asyncreq{292 done: make(chan interface{}),293 },294 }295 pool.reqChan <- req296 // wait till the request is done297 req.Wait()298 return req.Error299}300// Shutdown - shuts down the actor pool so that (some of) its resources301// can be reused, and the workers will be notified to shutdown.302//303// This will prevent all other requests from being executed. This does304// not affect any requests that have begun processing.305func (pool *WorkerPool) Shutdown() {306 pool.reqChan <- shutdownReq(true)307}308// Execute - executes a collection of operations (requests). The return309// is a channel that is closed when all the operations have either been310// successfully processed or failed.311//312// If an error is returned, the channel is closed.313func (pool *WorkerPool) Execute(ops []Operation) (<-chan Operation, error) {314 output := make(chan Operation, len(ops))315 executeReq := executeReq{316 Operations: ops,317 Output: output,318 asyncreq: asyncreq{319 done: make(chan interface{}),320 },321 }322 pool.reqChan <- &executeReq323 executeReq.Wait()324 return output, executeReq.Error325}326// Do - executes a single operation asynchronously. This returns an error327// if the worker pool is not running (e.g. has not started or is shutdown)328// because then the channels are all closed.329func (pool *WorkerPool) Do(op Operation) error {330 doSingleReq := doSingleReq{331 Input: op,332 asyncreq: asyncreq{333 done: make(chan interface{}),334 },335 }336 pool.reqChan <- &doSingleReq337 doSingleReq.Wait()338 return doSingleReq.Error339}340func (pool *WorkerPool) pipe(inStream <-chan Operation, outStream chan Operation, wg *sync.WaitGroup) {341 taskWg := sync.WaitGroup{}342 for op := range inStream {343 // execute the op344 doErr := pool.Do(op)345 if doErr != nil {346 // TODO: do some error handling here347 op.Done()348 }349 taskWg.Add(1)350 // wait till the op is done, and then move it to the outstream351 go func(op Operation, tW *sync.WaitGroup) {352 op.Wait()353 outStream <- op354 tW.Done()355 }(op, &taskWg)356 }357 // even if the inStream is closed, wait till all the358 // tasks have completed before returning359 taskWg.Wait()360 // at this point, mark the whole execute as done361 wg.Done()362 // close the outstream363 close(outStream)364}365func (pool *WorkerPool) wrapStream(inStream <-chan Operation, wg *sync.WaitGroup) (chan Operation, error) {366 outStream := make(chan Operation)367 if pool.state != running {368 close(outStream)369 return outStream, errors.New("pool not running; stream will never be processed")370 }371 go pool.pipe(inStream, outStream, wg)372 return outStream, nil373}374// Wrap - wraps an input stream into another output stream. The idea here...
procon_workerpool.go
Source:procon_workerpool.go
1//https://github.com/syafdia/go-exercise2package procon_workerpool3import(4 "fmt"5)6// T is a type alias to accept any type.7type T = interface{}8// Executor is a type alias for Worker Pool parameter.9type Executor = func() (T, error)10// Task will hold wrapped function which will11// be processed by Worker Pool.12type Task struct {13 ID string14 Result T15 Err error16 executor Executor17}18func NewTask(id string, executor Executor) *Task {19 return &Task{20 ID: id,21 executor: executor,22 }23}24// Execute will run wrapped function on Task instance25// and set the Result & Error property.26func (t *Task) Execute() {27 t.Result, t.Err = t.executor()28}29// WorkerPool is a contract for Worker Pool implementation30type WorkerPool interface {31 Run()32 AddTasks(tasks []*Task)33 GetProcessedTask() chan *Task34 GetTotalQueuedTask() int35}36type workerPool struct {37 maxWorker int38 taskC chan *Task39 queuedTaskC chan *Task40 processedTaskC chan *Task41}42// NewWorkerPool will create an instance of WorkerPool.43func NewWorkerPool(maxWorker int) WorkerPool {44 wp := &workerPool{45 maxWorker: maxWorker,46 queuedTaskC: make(chan *Task),47 processedTaskC: make(chan *Task),48 }49 return wp50}51func (wp *workerPool) Run() {52 wp.run() 53}54func (wp *workerPool) AddTask(id string, executor Executor) {55 go func() {56 task := &Task{57 ID: id,58 executor: executor,59 }60 wp.queuedTaskC <- task61 }()62 fmt.Printf("[WorkerPool] Task %s has been added", id)63}64func (wp *workerPool) AddTasks(tasks []*Task) {65 go func() {66 for _, task := range tasks {67 wp.queuedTaskC <- task68 }69 }()70}71func (wp *workerPool) GetTotalQueuedTask() int {72 return len(wp.queuedTaskC)73}74func (wp *workerPool) GetProcessedTask() chan *Task {75 return wp.processedTaskC76}77func (wp *workerPool) run() {78 for i := 0; i < wp.maxWorker; i++ {79 go func(workerID int) {80 for task := range wp.queuedTaskC {81 fmt.Printf("[WorkerPool] Worker %d start task %s \n", workerID, task.ID)82 task.Execute()83 wp.processedTaskC <- task84 fmt.Printf("[WorkerPool] Worker %d finished task %s \n", workerID, task.ID)85 }86 }(i + 1)87 }88}...
workqueue.go
Source:workqueue.go
1package workqueue2type Job interface {3 Execute() error // execute the job4 Report(err error) // report the error status of Execute()5}6type worker struct {7 workerPool chan chan Job8 jobQueue chan Job9 quit chan struct{}10}11func newWorker(quit chan struct{}, workerPool chan chan Job) *worker {12 worker := &worker{13 workerPool: workerPool,14 jobQueue: make(chan Job),15 }16 go worker.run(quit)17 return worker...
execute
Using AI Code Generation
1import (2func main() {3 workerPool := workerpool.NewWorkerPool(2, 2)4 job := func() {5 fmt.Println("I am a job")6 }7 workerPool.Submit(job)
execute
Using AI Code Generation
1func main() {2 pool := workerpool.New(3)3 pool.Execute(func() {4 fmt.Println("Task 1")5 })6 pool.Execute(func() {7 fmt.Println("Task 2")8 })9 pool.Execute(func() {10 fmt.Println("Task 3")11 })
execute
Using AI Code Generation
1import (2var (3func main() {4 fmt.Println("Number of cores available in the machine: ", runtime.NumCPU())5 workerPool = NewWorkerPool(4)6 workerPool.Start()7 workerPool.Execute(func() {8 for i := 0; i < 10; i++ {9 time.Sleep(1 * time.Second)10 fmt.Println("Worker 1 is working")11 }12 })13 workerPool.Execute(func() {14 for i := 0; i < 10; i++ {15 time.Sleep(1 * time.Second)16 fmt.Println("Worker 2 is working")17 }18 })19 workerPool.Execute(func() {20 for i := 0; i < 10; i++ {21 time.Sleep(1 * time.Second)22 fmt.Println("Worker 3 is working")23 }24 })25 workerPool.Stop()26}
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!