Best Testkube code snippet using v1.streamLogsFromJob
main.go
Source:main.go
1package main2import (3 "encoding/json"4 "errors"5 "fmt"6 "github.com/auth0/go-jwt-middleware"7 "github.com/codegangsta/negroni"8 "github.com/dgrijalva/jwt-go"9 "github.com/dummy-ai/mvp/master-server/models"10 "github.com/go-redis/redis"11 "github.com/gorilla/mux"12 "github.com/jinzhu/gorm"13 "github.com/levigross/grequests"14 "gopkg.in/yaml.v2"15 kube "k8s.io/client-go/kubernetes"16 "k8s.io/client-go/tools/clientcmd"17 "net/http"18 "os"19 "sort"20 "strconv"21 "time"22)23var db *gorm.DB24var kvstore *redis.Client25var kubeClient *kube.Clientset26func printRequest(r *http.Request, args ...interface{}) error {27 if r == nil {28 return errors.New("Cannot print nil request")29 }30 fmt.Printf("%s %s %s %s %v\n", r.RemoteAddr, r.Method, r.URL.Path, r.Proto, args)31 return nil32}33func AuthenticationError(w http.ResponseWriter, r *http.Request, err string) {34 fmt.Println("header", r.Header)35 fmt.Println("[Authentication] Error: ", err)36 w.WriteHeader(400)37 w.Write([]byte("Authorization Error: " + err))38}39func CreateClient(kubeconfig string) *kube.Clientset {40 config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)41 if err != nil {42 panic(err)43 }44 client, err := kube.NewForConfig(config)45 if err != nil {46 panic(err)47 }48 return client49}50func GetModelPath(user string, name string, tag string) string {51 return fmt.Sprintf("/model/%s/%s/%s", user, name, tag)52}53func sayHello(w http.ResponseWriter, r *http.Request) {54 response, _ := json.Marshal(map[string]string{55 "status": "OK",56 })57 w.Header().Set("Content-Type", "application/json")58 w.Write(response)59}60func ping(w http.ResponseWriter, r *http.Request) {61 vars := mux.Vars(r)62 user := vars["user"]63 if user == "" {64 http.Error(w, "User name cannot be empty", 400)65 return66 }67 printRequest(r, "user", user)68 response, _ := json.Marshal(map[string]string{69 "status": "OK",70 })71 w.Header().Set("Content-Type", "application/json")72 w.Write(response)73}74func getRepoURL(w http.ResponseWriter, r *http.Request) {75 query := r.URL.Query()76 fmt.Println("[GET] Repo URL", query)77 user := query.Get("user") // dummy user78 name := query.Get("name") // project name79 if user == "" {80 http.Error(w, "User name cannot be empty", 400)81 return82 }83 if name == "" {84 http.Error(w, "Project name cannot be empty", 400)85 return86 }87 url, err := GetRepoURL(user, name)88 if err != nil {89 http.Error(w,90 fmt.Sprintf("Cannot get repo URL. Reason: %s", err.Error()),91 500)92 return93 }94 fmt.Println("url = ", url)95 response, _ := json.Marshal(map[string]string{96 "url": url,97 })98 w.Header().Set("Content-Type", "application/json")99 w.Write(response)100}101// contentType: by default is "application/octet-stream". Set to "ignore" to not102// use this in GCS sign.103func getDataURL(w http.ResponseWriter, r *http.Request) {104 query := r.URL.Query()105 user := query.Get("user") // dummy user106 name := query.Get("name") // project name107 cloud := query.Get("cloud") // cloud provider: gcloud108 verb := query.Get("verb") // HTTP verb: PUT | GET109 path := query.Get("path") // path to object110 contentType := query.Get("content-type")111 if contentType == "" {112 contentType = "application/octet-stream"113 }114 if cloud == "" {115 cloud = "gcloud"116 }117 fmt.Println("[GET] Data URL from Cloud Provider " + cloud)118 var url string119 var err error120 if cloud == "gcloud" {121 // Signed URL from Google Cloud Storage.122 url, err = GetGCloudStorageURL(user, name, path, verb, contentType)123 if err != nil {124 http.Error(w, err.Error(), 500)125 return126 }127 fmt.Println(url)128 }129 response, _ := json.Marshal(map[string]string{130 "url": url,131 })132 w.Header().Set("Content-Type", "application/json")133 w.Write(response)134}135func listModels(w http.ResponseWriter, r *http.Request) {136 vars := mux.Vars(r)137 user := vars["user"]138 var err error139 var ms []models.Model140 if user == "_" {141 ms, err = models.ListModelAll(db)142 } else {143 ms, err = models.ListModelByUser(db, user)144 }145 if err != nil {146 http.Error(w, fmt.Sprintf("Error: %s", err.Error()), 500)147 return148 }149 // Convert model objects to maps.150 var results []map[string]interface{}151 for _, model := range ms {152 metadata := model.ToMap()153 // name := metadata["name"].(string)154 // tag := metadata["tag"].(string)155 // Disable status probe to save response time.156 // metadata["status"] = getModelStatus(user, name, tag)157 metadata["status"] = "UNKNOWN"158 results = append(results, metadata)159 }160 sort.Sort(models.ModelList(results))161 response, _ := json.Marshal(results)162 w.Header().Set("Content-Type", "application/json")163 w.Write(response)164}165func listModelTags(w http.ResponseWriter, r *http.Request) {166 vars := mux.Vars(r)167 userId := vars["user"]168 modelName := vars["model"]169 var err error170 var ms []models.Model171 ms, err = models.ListModelByUserAndName(db, userId, modelName)172 if err != nil {173 http.Error(w, fmt.Sprintf("Error: %s", err.Error()), 500)174 return175 }176 var results []map[string]interface{}177 for _, model := range ms {178 results = append(results, model.ToMap())179 }180 // Special handling for empty list.181 // json.Marshal would return "null".182 if len(results) == 0 {183 w.Write([]byte("[]"))184 return185 }186 response, err := json.Marshal(results)187 if err != nil {188 http.Error(w, fmt.Sprintf("Error: %s", err.Error()), 500)189 return190 }191 w.Header().Set("Content-Type", "application/json")192 w.Write(response)193}194func postLanding(w http.ResponseWriter, r *http.Request) {195 // Read request JSON.196 var data map[string]interface{}197 err := json.NewDecoder(r.Body).Decode(&data)198 if err != nil {199 http.Error(w, "Unable to read the HTTP request body", 400)200 return201 }202 fmt.Println("[POST] A new user is landing", data)203 landing := models.Landing{204 Email: data["email"].(string),205 }206 err = models.AddLanding(db, landing)207 if err != nil {208 http.Error(w, err.Error(), 500)209 return210 }211 w.WriteHeader(200)212}213func getAuth(w http.ResponseWriter, r *http.Request) {214 query := r.URL.Query()215 fmt.Println("[GET] User Auth with Code", query)216 code := query.Get("code") // dummy user217 redirect_uri := query.Get("redirect_uri")218 // Get authentication token.219 resp, err := grequests.Post("https://dummyai.auth0.com/oauth/token",220 &grequests.RequestOptions{221 Headers: map[string]string{222 "Content-type": "application/x-www-form-urlencoded",223 },224 Data: map[string]string{225 "client_id": "MJciGnUXnD850clHoLM4tkltFlkgJGPs",226 "redirect_uri": redirect_uri,227 "client_secret": "Ck7IWvxkZ0rXq4AVIe6wzP_VNJk1bYRG7rV_IAzdNcE5UKKKItLfdBIPPWfma9Jb",228 "code": code,229 "grant_type": "authorization_code",230 }})231 if err != nil {232 http.Error(w, err.Error(), 500)233 return234 }235 var data map[string]interface{}236 if err := resp.JSON(&data); err != nil {237 http.Error(w, err.Error(), 500)238 return239 }240 resp, err = grequests.Get("https://dummyai.auth0.com/userinfo?access_token="+data["access_token"].(string),241 &grequests.RequestOptions{})242 if err != nil {243 http.Error(w, err.Error(), 500)244 return245 }246 var profile map[string]interface{}247 if err := resp.JSON(&profile); err != nil {248 http.Error(w, err.Error(), 500)249 return250 }251 data["profile"] = profile252 response, err := json.Marshal(data)253 if err != nil {254 http.Error(w, err.Error(), 500)255 return256 }257 w.Header().Set("Content-Type", "application/json")258 w.WriteHeader(200)259 w.Write(response)260}261// TODO: put lock on this.262func putModel(w http.ResponseWriter, r *http.Request) {263 vars := mux.Vars(r)264 user := vars["user"]265 name := vars["model"]266 tag := vars["tag"]267 var params map[string]interface{}268 err := json.NewDecoder(r.Body).Decode(¶ms)269 if err != nil {270 http.Error(w, err.Error(), 400)271 return272 }273 commit := ""274 if _, ok := params["commit"]; ok {275 commit = params["commit"].(string)276 }277 fmt.Println(fmt.Sprintf("[PUT] Create / Update model metadata %s/%s:%s",278 user, name, tag))279 model, err := models.GetModelById(db, models.ModelId(user, name, tag))280 if err == nil { // Model already exists. Update it.281 // Merge metdata.282 var currMetadata map[interface{}]interface{}283 yaml.Unmarshal([]byte(model.Metadata), &currMetadata)284 var newMetadata map[interface{}]interface{}285 yaml.Unmarshal([]byte(params["metadata"].(string)), &newMetadata)286 updateInterfaceMap(currMetadata, newMetadata)287 // Overwrite Spec.288 var specYAML string289 if params["spec"] != nil {290 specYAML = params["spec"].(string)291 } else {292 specYAML = model.Spec293 }294 if commit == "" {295 commit = model.Commit296 }297 metadataBytes, err := yaml.Marshal(cleanupInterfaceMap(currMetadata))298 if err != nil {299 http.Error(w, err.Error(), 500)300 return301 }302 metadataYAML := string(metadataBytes)303 fmt.Println("Model metadata", metadataYAML)304 fmt.Println("Model spec", specYAML)305 model = models.Model{306 UserId: vars["user"],307 Name: vars["model"],308 Tag: vars["tag"],309 Metadata: metadataYAML,310 Spec: specYAML,311 Commit: commit,312 Status: model.Status,313 }314 models.UpdateModel(db, model)315 return316 } else { // Create a new model.317 model = models.Model{318 UserId: vars["user"],319 Name: vars["model"],320 Tag: vars["tag"],321 Spec: "",322 Metadata: params["metadata"].(string),323 Commit: commit,324 Status: "INACTIVE",325 }326 err = models.AddModel(db, model)327 if err != nil {328 http.Error(w, err.Error(), 500)329 return330 }331 w.WriteHeader(200)332 }333}334func getUser(w http.ResponseWriter, r *http.Request) {335 vars := mux.Vars(r)336 user := vars["user"]337 printRequest(r)338 if user == "" {339 http.Error(w, "Must provide a username to get his/her profile", 400)340 return341 }342 accessToken, err := GetAuth0AccessToken()343 if err != nil {344 http.Error(w, err.Error(), 500)345 return346 }347 users, err := GetAuth0UsersByQuery(accessToken, "username:"+user)348 if err != nil {349 http.Error(w, err.Error(), 500)350 return351 }352 if len(users) != 1 {353 http.Error(w, "Found no user or more than 1 users", 500)354 return355 }356 w.Header().Set("Content-Type", "application/json")357 response, err := json.Marshal(users[0])358 if err != nil {359 http.Error(w, fmt.Sprintf("Unable to dump JSON: %s. %v", err.Error(), users[0]), 500)360 return361 }362 w.WriteHeader(200)363 w.Write(response)364}365func putModelPageView(w http.ResponseWriter, r *http.Request) {366 vars := mux.Vars(r)367 user := vars["user"]368 name := vars["model"]369 tag := vars["tag"]370 printRequest(r)371 err := IncrPageViewCount(kvstore, models.ModelId(user, name, tag))372 if err != nil {373 http.Error(w, err.Error(), 500)374 }375}376func getModelPageView(w http.ResponseWriter, r *http.Request) {377 vars := mux.Vars(r)378 user := vars["user"]379 name := vars["model"]380 tag := vars["tag"]381 printRequest(r)382 result, err := GetPageViewCounts(kvstore, models.ModelId(user, name, tag))383 if err != nil {384 http.Error(w, err.Error(), 500)385 return386 }387 w.Header().Set("Content-Type", "application/json")388 response, err := json.Marshal(result)389 if err != nil {390 http.Error(w, fmt.Sprintf("Unable to dump JSON: %s. %v", err.Error(), result), 500)391 return392 }393 w.WriteHeader(200)394 w.Write(response)395}396func putModelDemoRun(w http.ResponseWriter, r *http.Request) {397 vars := mux.Vars(r)398 user := vars["user"]399 name := vars["model"]400 tag := vars["tag"]401 printRequest(r)402 err := IncrDemoRunCount(kvstore, models.ModelId(user, name, tag))403 if err != nil {404 http.Error(w, err.Error(), 500)405 }406}407func getModelDemoRun(w http.ResponseWriter, r *http.Request) {408 vars := mux.Vars(r)409 user := vars["user"]410 name := vars["model"]411 tag := vars["tag"]412 printRequest(r)413 result, err := GetDemoRunCount(kvstore, models.ModelId(user, name, tag))414 if err != nil {415 http.Error(w, err.Error(), 500)416 return417 }418 w.Header().Set("Content-Type", "application/json")419 response, err := json.Marshal(result)420 if err != nil {421 http.Error(w, fmt.Sprintf("Unable to dump JSON: %s. %v", err.Error(), result), 500)422 return423 }424 w.WriteHeader(200)425 w.Write(response)426}427func teardownModel(user string, name string, tag string) error {428 deployName := getModelDeployName(user, name, tag)429 path := GetModelPath(user, name, tag)430 err := TeardownDeploy(kubeClient, deployName)431 if err != nil {432 return err433 }434 err = TeardownService(kubeClient, deployName)435 if err != nil {436 return err437 }438 err = RemoveServiceFromIngress(kubeClient, path)439 if err != nil {440 return err441 }442 return nil443}444func deleteModelWithTag(w http.ResponseWriter, r *http.Request) {445 vars := mux.Vars(r)446 printRequest(r, fmt.Sprintf("Deleting model %s/%s:%s", vars["user"], vars["model"], vars["tag"]))447 // Get ModelId based on user, model name and tag.448 user := vars["user"]449 name := vars["model"]450 tag := vars["tag"]451 modelId := models.ModelId(user, name, tag)452 // Check if the model is live.453 status := getModelStatus(user, name, tag)454 if status == "LIVE" {455 err := teardownModel(user, name, tag)456 if err != nil {457 http.Error(w, err.Error(), 500)458 return459 }460 }461 // Otherwise, delete the model from database.462 err := models.DeleteModel(db, modelId)463 if err != nil {464 http.Error(w, err.Error(), 500)465 return466 }467}468func deleteModels(w http.ResponseWriter, r *http.Request) {469 vars := mux.Vars(r)470 printRequest(r, fmt.Sprintf("Deleting model %s/%s:%s", vars["user"], vars["model"], vars["tag"]))471 // Get ModelId based on user, model name and tag.472 userId := vars["user"]473 modelName := vars["model"]474 ms, err := models.ListModelByUserAndName(db, userId, modelName)475 if err != nil {476 http.Error(w, err.Error(), 500)477 return478 }479 fmt.Println("models to delete", ms)480 for _, model := range ms {481 tag := model.Tag482 modelId := models.ModelId(userId, modelName, tag)483 // Check if the model is live.484 status := getModelStatus(userId, modelName, tag)485 if status == "LIVE" {486 err := teardownModel(userId, modelName, tag)487 if err != nil {488 http.Error(w, err.Error(), 500)489 return490 }491 }492 // Delete the model from database.493 err := models.DeleteModel(db, modelId)494 if err != nil {495 http.Error(w, err.Error(), 500)496 return497 }498 }499 w.WriteHeader(200)500}501func getModelDeployName(userId string, modelName string, tag string) string {502 modelId := models.ModelId(userId, modelName, tag)503 model, err := models.GetModelById(db, modelId)504 if err != nil {505 return ""506 }507 return GetDeployName(userId, modelName, tag, model.Commit)508}509func getModelStatus(userId string, modelName string, tag string) string {510 deployName := getModelDeployName(userId, modelName, tag)511 pods, err := GetPodsByDeployName(kubeClient, deployName)512 if err != nil {513 return models.StatusError514 }515 if len(pods) == 0 {516 return models.StatusInactive517 }518 phase := pods[0].Status.Phase519 fmt.Println("Pod phase", phase, deployName)520 if phase == "Pending" {521 return models.StatusPending522 } else if phase == "Running" {523 return models.StatusLive524 } else if phase == "Terminating" {525 return models.StatusTerminating526 } else {527 return models.StatusError528 }529}530func getModel(w http.ResponseWriter, r *http.Request) {531 vars := mux.Vars(r)532 user := vars["user"]533 name := vars["model"]534 tag := vars["tag"]535 printRequest(r, fmt.Sprintf("Get model %s/%s:%s", user, name, tag))536 w.Header().Set("Content-Type", "application/json")537 // Get ModelId based on user, model name and tag.538 modelId := models.ModelId(user, name, tag)539 // fmt.Printf("modelId = %s\n", modelId)540 model, err := models.GetModelById(db, modelId)541 var status string542 var metadataYAML string543 var specYAML string544 if model.Commit == "" {545 status = models.StatusEmpty546 metadataYAML = model.Metadata547 specYAML = model.Spec548 } else if err == nil {549 status = getModelStatus(user, name, tag)550 metadataYAML = model.Metadata551 specYAML = model.Spec552 } else {553 status = models.StatusNone554 metadataYAML = ""555 specYAML = ""556 }557 var metadata map[interface{}]interface{}558 yaml.Unmarshal([]byte(metadataYAML), &metadata)559 var spec map[interface{}]interface{}560 yaml.Unmarshal([]byte(specYAML), &spec)561 // By default YAML returns map[interface{}][interface{}} for nested maps.562 // See https://github.com/go-yaml/yaml/issues/139563 results := cleanupInterfaceMap(map[interface{}]interface{}{564 "status": status,565 "metadata": metadata,566 "spec": spec,567 })568 response, err := json.Marshal(results)569 if err != nil {570 http.Error(w, fmt.Sprintf("Unable to dump JSON: %s. %v", err.Error(), results), 500)571 return572 }573 w.WriteHeader(200)574 w.Write(response)575}576func postModel(w http.ResponseWriter, r *http.Request) {577 vars := mux.Vars(r)578 user := vars["user"]579 name := vars["model"]580 tag := vars["tag"]581 fmt.Printf("[POST] Changing the state of the model %s/%s:%s\n",582 user, name, tag)583 // Get ModelId based on user, model name and tag.584 modelId := models.ModelId(user, name, tag)585 fmt.Printf("modelId = %s\n", modelId)586 // Read request JSON.587 var data map[string]interface{}588 err := json.NewDecoder(r.Body).Decode(&data)589 if err != nil {590 http.Error(w, "Unable to read the HTTP request body", 400)591 return592 }593 model, err := models.GetModelById(db, modelId)594 if err != nil {595 http.Error(w, err.Error(), 500)596 return597 }598 fmt.Println(model)599 if data["action"] == "deploy" {600 fmt.Println(fmt.Sprintf("[POST] Deploying model %s/%s:%s", user, name, tag))601 replicas, ok := data["replicas"].(int)602 if !ok {603 replicas = 1604 }605 deployName, err := CreateDeployV2(kubeClient, user, name, tag, model.Commit, model.Spec, replicas)606 if err != nil {607 http.Error(w, "Unable to create deployment. "+err.Error(), 500)608 return609 }610 err = CreateService(kubeClient, deployName)611 if err != nil {612 http.Error(w, "Unable to expose deployment as service. "+err.Error(), 500)613 // Rollback deployment.614 TeardownDeploy(kubeClient, deployName)615 return616 }617 path := GetModelPath(user, name, tag)618 RemoveServiceFromIngress(kubeClient, path)619 err = AddServiceToIngress(kubeClient, path, deployName)620 if err != nil {621 http.Error(w, "Unable to create Ingress endpoint for "+deployName, 500)622 // Rollback service and deployment.623 TeardownService(kubeClient, deployName)624 TeardownDeploy(kubeClient, deployName)625 return626 }627 w.WriteHeader(200)628 return629 } else if data["action"] == "ping" {630 w.WriteHeader(200)631 return632 } else if data["action"] == "teardown" {633 err = teardownModel(user, name, tag)634 if err != nil {635 http.Error(w, err.Error(), 500)636 }637 return638 } else {639 w.WriteHeader(400)640 w.Write([]byte(fmt.Sprintf("[POST] Deploying model using unknown action %s", data["action"])))641 }642}643func putJob(w http.ResponseWriter, r *http.Request) {644 vars := mux.Vars(r)645 user := vars["user"]646 repo := vars["repo"]647 commit := vars["commit"]648 fmt.Println(fmt.Sprintf("[PUT] Create a new experiment job %s/%s:%s",649 user, repo, commit))650 // Check if the job already exists.651 _, err := models.GetJobById(db, models.JobId(user, repo, commit))652 if err == nil {653 http.Error(w, fmt.Sprintf("Experiment %s/%s:%s already exists",654 user, repo, commit), 500)655 return656 }657 // Extract parameters.658 var params map[string]interface{}659 err = json.NewDecoder(r.Body).Decode(¶ms)660 if err != nil {661 http.Error(w, err.Error(), 400)662 return663 }664 yamlString := params["yaml"].(string)665 if yamlString == "" {666 http.Error(w, "Experiment YAML not provided", 400)667 return668 }669 // Launch job through Kubernetes controller.670 _, err = CreateJobV1(kubeClient, commit, yamlString)671 if err != nil {672 http.Error(w, fmt.Sprintf("Failed to create job %s", err.Error()), 500)673 return674 }675 // Write experiment job to database.676 var config map[string]interface{}677 yaml.Unmarshal([]byte(yamlString), &config)678 job := models.Job{679 UserId: user,680 Repo: repo,681 Commit: commit,682 Name: config["name"].(string),683 Yaml: yamlString,684 Status: "PENDING",685 }686 err = models.AddJob(db, job)687 if err != nil {688 http.Error(w, err.Error(), 500)689 return690 }691 w.WriteHeader(200)692 w.Write([]byte(""))693}694// For Jobs. Deprecated.695func logJob(w http.ResponseWriter, r *http.Request) {696 vars := mux.Vars(r)697 user := vars["user"]698 repo := vars["repo"]699 commit := vars["commit"]700 fmt.Println(fmt.Sprintf("[PUT] Logging an experiment job %s/%s:%s",701 user, repo, commit))702 jobName := GetJobName(user, repo, commit)703 w.WriteHeader(200)704 err := StreamLogsFromJob(kubeClient, jobName, true, w)705 if err != nil {706 http.Error(w, fmt.Sprintf("Error: %s", err.Error()), 500)707 }708}709func logModel(w http.ResponseWriter, r *http.Request) {710 vars := mux.Vars(r)711 userId := vars["user"]712 modelName := vars["model"]713 tag := vars["tag"]714 modelId := models.ModelId(userId, modelName, tag)715 model, err := models.GetModelById(db, modelId)716 if err != nil {717 http.Error(w, err.Error(), 500)718 }719 follow, err := strconv.ParseBool(r.URL.Query().Get("follow"))720 if err != nil {721 http.Error(w, err.Error(), 500)722 }723 fmt.Println(fmt.Sprintf("[PUT] Logging a model deployment %s/%s:%s",724 userId, modelName, tag))725 w.WriteHeader(200)726 flushedWriter := FlushedWriter{HttpWriter: w}727 if err := StreamLogsFromModel(kubeClient, userId, modelName, tag, model.Commit, follow, &flushedWriter); err != nil {728 http.Error(w, err.Error(), 500)729 }730}731// Put a rating in the database.732// The request body contains a JSON object {"value": <rating_value>}733func putRating(w http.ResponseWriter, r *http.Request) {734 vars := mux.Vars(r)735 userId := vars["userId"]736 modelId := vars["modelId"]737 printRequest(r)738 var params map[string]interface{}739 err := json.NewDecoder(r.Body).Decode(¶ms)740 if err != nil {741 http.Error(w, err.Error(), 400)742 return743 }744 value := params["value"].(float64)745 err = models.UpdateRating(db, userId, modelId, value)746 if err != nil {747 http.Error(w, err.Error(), 500)748 }749}750// Get the rating from the database.751func getRating(w http.ResponseWriter, r *http.Request) {752 vars := mux.Vars(r)753 userId := vars["userId"]754 modelId := vars["modelId"]755 printRequest(r)756 rating, err := models.GetRatingById(db, models.RatingId(userId, modelId))757 value := 0.758 if err == nil {759 value = rating.Value760 }761 w.WriteHeader(200)762 response, _ := json.Marshal(map[string]float64{763 "value": value,764 })765 w.Header().Set("Content-Type", "application/json")766 w.Write(response)767}768// Delete the rating from the database.769func deleteRating(w http.ResponseWriter, r *http.Request) {770 vars := mux.Vars(r)771 userId := vars["userId"]772 modelId := vars["modelId"]773 printRequest(r)774 err := models.DeleteRating(db, models.RatingId(userId, modelId))775 if err != nil {776 http.Error(w, err.Error(), 500)777 return778 }779 w.WriteHeader(200)780}781func putExample(w http.ResponseWriter, r *http.Request) {782 vars := mux.Vars(r)783 printRequest(r)784 var params map[string]interface{}785 err := json.NewDecoder(r.Body).Decode(¶ms)786 if err != nil {787 http.Error(w, err.Error(), 400)788 return789 }790 var clientVersion string791 if clientVersionInterface, ok := params["clientVersion"]; ok {792 clientVersion = clientVersionInterface.(string)793 } else {794 http.Error(w, "putExample request must have key clientVersion", 400)795 return796 }797 var clientLatency float32798 if clientLatencyInterface, ok := params["clientLatency"]; ok {799 clientLatency = float32(clientLatencyInterface.(float64))800 } else {801 http.Error(w, "putExample request must have key clientLatency", 400)802 return803 }804 example := models.Example{805 Uid: vars["exampleId"],806 ModelId: vars["user"] + "/" + vars["model"] + ":" + vars["tag"],807 ClientLatency: clientLatency,808 ClientVersion: clientVersion,809 }810 err = models.AddExample(db, example)811 if err != nil {812 http.Error(w, err.Error(), 500)813 return814 }815 w.WriteHeader(200)816}817func putDemoExample(w http.ResponseWriter, r *http.Request) {818 vars := mux.Vars(r)819 example := models.DemoExample{820 Uid: vars["exampleId"],821 ModelId: vars["user"] + "/" + vars["model"] + ":" + vars["tag"],822 }823 err := models.AddDemoExample(db, example)824 if err != nil {825 http.Error(w, err.Error(), 500)826 return827 }828 w.WriteHeader(200)829}830func listExamples(w http.ResponseWriter, r *http.Request) {831 vars := mux.Vars(r)832 modelId := vars["user"] + "/" + vars["model"] + ":" + vars["tag"]833 examples, err := models.ListExamples(db, modelId)834 if err != nil {835 http.Error(w, err.Error(), 500)836 return837 }838 var results []map[string]interface{}839 for _, example := range examples {840 results = append(results, map[string]interface{}{841 "exampleId": example.Uid,842 "modelId": example.ModelId,843 "clientVersion": example.ClientVersion,844 "clientLatency": example.ClientLatency,845 })846 }847 response, _ := json.Marshal(results)848 w.Header().Set("Content-Type", "application/json")849 w.WriteHeader(200)850 w.Write(response)851}852func listDemoExamples(w http.ResponseWriter, r *http.Request) {853 vars := mux.Vars(r)854 modelId := vars["user"] + "/" + vars["model"] + ":" + vars["tag"]855 examples, err := models.ListDemoExamples(db, modelId)856 if err != nil {857 http.Error(w, err.Error(), 500)858 return859 }860 var results []map[string]interface{}861 for _, example := range examples {862 results = append(results, map[string]interface{}{863 "exampleId": example.Uid,864 "modelId": example.ModelId,865 })866 }867 response, _ := json.Marshal(results)868 w.Header().Set("Content-Type", "application/json")869 w.WriteHeader(200)870 w.Write(response)871}872func main() {873 var err error874 // Initialize Global Constants based on environment variable.875 InitGlobal()876 if len(os.Args) < 2 {877 fmt.Println("Usage: master-server [migrate / start]")878 return879 }880 command := os.Args[1]881 if command == "migrate" {882 // migrate database schema.883 db = models.CreateDB(DBAddress)884 defer db.Close()885 models.MigrateDB(db)886 } else if command == "migrate092417" {887 // Migrate database schema.888 // Currently, model metadata are separate for every version of the model.889 // We want to have them share same metadata.890 db = models.CreateDB(DBAddress)891 db.Model(&models.Model{}).DropColumn("repo")892 models.MigrateDB(db)893 } else if command == "start" {894 // Database.895 db = models.CreateDB(DBAddress)896 kvstore = models.CreateKeyValueStore(RedisDBAddress, RedisDBPassword)897 kubeClient = CreateClient(KubeConfig)898 defer db.Close()899 // Authorization.900 jwtMiddleware := jwtmiddleware.New(jwtmiddleware.Options{901 ValidationKeyGetter: func(token *jwt.Token) (interface{}, error) {902 // https://github.com/dgrijalva/jwt-go/issues/147903 // Need to parse RSA public key first.904 key, _ := jwt.ParseRSAPublicKeyFromPEM([]byte(JWT_PUBLIC_KEY_CLI))905 return key, nil906 },907 // When set, the middleware verifies that tokens are signed with the specific signing algorithm908 // If the signing method is not constant the ValidationKeyGetter callback can be used to implement additional checks909 // Important to avoid security issues described here: https://auth0.com/blog/2015/03/31/critical-vulnerabilities-in-json-web-token-libraries/910 SigningMethod: jwt.SigningMethodRS256,911 ErrorHandler: AuthenticationError,912 })913 fmt.Println(jwtMiddleware)914 // HTTP Router.915 router := mux.NewRouter()916 // Authentication based on JWT.917 router.Handle(`/git/{rest:[a-zA-Z0-9=\.\-\/]+}`, negroni.New(918 negroni.HandlerFunc(jwtMiddleware.HandlerWithNext),919 negroni.Wrap(GetGitRequestHandler()),920 ))921 // No authentication. For debugging.922 // router.Handle(`/git/{rest:[a-zA-Z0-9=\-\/]+}`, GetGitRequestHandler())923 router.Handle("/ping/{user}", jwtMiddleware.Handler(http.HandlerFunc(ping))).Methods("GET")924 router.HandleFunc("/", sayHello).Methods("GET")925 router.HandleFunc("/url/code", getRepoURL).Methods("GET")926 router.HandleFunc("/url/data", getDataURL).Methods("GET")927 router.HandleFunc("/users/{user}/models/{model}/{tag}", getModel).Methods("GET")928 // PUT is used to create / update metadata. POST is used to control model deployments.929 router.HandleFunc("/users/{user}/models/{model}/{tag}", putModel).Methods("PUT")930 router.HandleFunc("/users/{user}/models/{model}/{tag}", postModel).Methods("POST")931 router.HandleFunc("/users/{user}/models/{model}", deleteModels).Methods("DELETE")932 router.HandleFunc("/users/{user}/models/{model}/{tag}", deleteModelWithTag).Methods("DELETE")933 router.HandleFunc("/users/{user}/models/{model}/{tag}/log", logModel).Methods("GET")934 router.HandleFunc("/users/{user}/models/{model}/{tag}/examples/{exampleId}", putExample).Methods("PUT")935 router.HandleFunc("/users/{user}/models/{model}/{tag}/examples", listExamples).Methods("GET")936 router.HandleFunc("/users/{user}/models/{model}/{tag}/demo-examples/{exampleId}", putDemoExample).Methods("PUT")937 router.HandleFunc("/users/{user}/models/{model}/{tag}/demo-examples", listDemoExamples).Methods("GET")938 // Get user metadata.939 router.HandleFunc("/users/{user}", getUser).Methods("GET")940 // Endpoints for analytics.941 router.HandleFunc("/users/{user}/models/{model}/{tag}/analytics/page-view", putModelPageView).Methods("PUT")942 router.HandleFunc("/users/{user}/models/{model}/{tag}/analytics/page-view", getModelPageView).Methods("GET")943 router.HandleFunc("/users/{user}/models/{model}/{tag}/analytics/demo-run", putModelDemoRun).Methods("PUT")944 router.HandleFunc("/users/{user}/models/{model}/{tag}/analytics/demo-run", getModelDemoRun).Methods("GET")945 // List models.946 router.HandleFunc("/users/{user}/models", listModels).Methods("GET")947 router.HandleFunc("/users/{user}/models/{model}", listModelTags).Methods("GET")948 router.HandleFunc("/job/{user}/{repo}/{commit}", putJob).Methods("PUT")949 router.HandleFunc("/job/{user}/{repo}/{commit}/log", logJob).Methods("GET")950 // Endpoints for manipulating user rating for models.951 router.HandleFunc(`/rating/{userId}/{modelId:(?:.|\/)*}`, putRating).Methods("PUT")952 router.HandleFunc("/rating/{userId}/{modelId:.*}", getRating).Methods("GET")953 router.HandleFunc(`/rating/{userId}/{modelId:[.\/]*}`, deleteRating).Methods("DELETE")954 // Other endpoints.955 router.HandleFunc("/landing", postLanding).Methods("POST")956 router.HandleFunc("/auth", getAuth).Methods("GET")957 fmt.Println(fmt.Sprintf("0.0.0.0:%d", MasterPort))958 fmt.Println("Starting HTTP master server")959 server := &http.Server{960 Handler: router,961 Addr: fmt.Sprintf("0.0.0.0:%d", MasterPort),962 WriteTimeout: 3600 * time.Second,963 ReadTimeout: 3600 * time.Second,964 }965 err = server.ListenAndServe()966 if err != nil {967 panic(err)968 }969 } else {970 panic("Unknown command " + command)971 }972}...
kube.go
Source:kube.go
1package main2import (3 "bytes"4 "encoding/json"5 "errors"6 "fmt"7 "github.com/dummy-ai/mvp/master-server/models"8 "github.com/google/uuid"9 "io"10 "k8s.io/api/apps/v1beta1"11 batchv1 "k8s.io/api/batch/v1"12 v1 "k8s.io/api/core/v1"13 v1beta1Extensions "k8s.io/api/extensions/v1beta1"14 k8sErrors "k8s.io/apimachinery/pkg/api/errors"15 "k8s.io/apimachinery/pkg/api/resource"16 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"17 "k8s.io/apimachinery/pkg/util/intstr"18 "k8s.io/apimachinery/pkg/util/yaml"19 kube "k8s.io/client-go/kubernetes"20 "net/http"21 "strconv"22 "strings"23 "text/template"24 "time"25)26const kubeNamespace string = v1.NamespaceDefault27const ingressName string = "dummy"28// Flushed writer for streaming logs.29type FlushedWriter struct {30 HttpWriter http.ResponseWriter31}32func (w *FlushedWriter) Write(p []byte) (n int, err error) {33 n, err = w.HttpWriter.Write(p)34 w.HttpWriter.(http.Flusher).Flush()35 return n, err36}37// V1 deployment template38// Assume the user has packaged the model in a docker container.39// All we need to do is to ship the container to cloud.40const templateDeployV1 = `41apiVersion: extensions/v1beta142kind: Deployment43metadata:44 name: {{.Name}} 45spec:46 replicas: {{.Replica}}47 template:48 metadata:49 name: {{.Name}}50 labels:51 app: {{.Name}}52 spec:53 containers:54 - name: main55 image: {{.Image}}56 ports:57 - containerPort: 590058 tty: true59`60// V2 deployment template61// Assume the user pushes code and assets.62// Warpdrive creates a container that mounts these resources.63const templateDeployV2 = `64apiVersion: extensions/v1beta165kind: Deployment66metadata:67 name: {{.Name}} 68spec:69 replicas: {{.Replica}}70 template:71 metadata:72 name: {{.Name}}73 labels:74 app: {{.Name}}75 spec:76 containers:77 - name: main78 image: {{.Image}}79 ports:80 - containerPort: 590081 tty: true82 volumeMounts:83 - name: nfs84 mountPath: "/mnt/nfs"85 - name: cloudfs86 mountPath: "/mnt/cloudfs"87 - name: fuse88 mountPath: "/dev/fuse"89 - name: nvidia90 mountPath: "/usr/local/nvidia"91 securityContext:92 privileged: true93 capabilities:94 add:95 - SYS_ADMIN96 volumes:97 - name: nfs98 persistentVolumeClaim:99 claimName: nfs-claim100 - name: cloudfs101 hostPath:102 path: /mnt/cloudfs/{{.AssetRoot}}103 - name: fuse104 hostPath:105 path: /dev/fuse106 - name: nvidia107 hostPath:108 path: /var/lib/nvidia-docker/volumes/nvidia_driver/384.81109`110// experiment job template.111// Assume the user already pushes code and asset112// Warpdrive creates a job that113// - mounts these resources in the container.114// - executes the command.115const templateJobV1 = `116apiVersion: batch/v1117kind: Job118metadata:119 name: {{.Name}} 120 labels:121 owner: dummy122spec:123 replicas: 1124 activeDeadlineSeconds: 86400 # in seconds125 template:126 metadata:127 labels:128 owner: nobody129 spec:130 restartPolicy: Never131 containers:132 - name: main133 image: {{.Image}}134 ports:135 - containerPort: 5900136 tty: true137 volumeMounts:138 - name: nfs139 mountPath: "/mnt/nfs"140 - name: secrets141 mountPath: "/secrets"142 - name: fuse143 mountPath: "/dev/fuse"144 - name: nvidia145 mountPath: "/usr/local/nvidia"146 securityContext:147 privileged: true148 capabilities:149 add:150 - SYS_ADMIN151 volumes:152 - name: nfs153 persistentVolumeClaim:154 claimName: nfs-claim155 - name: secrets156 configMap:157 name: secrets158 - name: fuse159 hostPath:160 path: /dev/fuse161 - name: nvidia162 hostPath:163 path: /var/lib/nvidia-docker/volumes/nvidia_driver/375.26164`165func panicNil(err error) {166 if err != nil {167 panic(err)168 }169}170func decodeYAML(yamlString string, output interface{}) error {171 reader := strings.NewReader(yamlString)172 return yaml.NewYAMLOrJSONDecoder(reader, 32).Decode(output)173}174func SpecFromTemplate(templateString string, data interface{}, output interface{}) error {175 template, err := template.New(uuid.New().String()).Parse(templateString)176 panicNil(err)177 buf := new(bytes.Buffer)178 err = template.Execute(buf, data)179 panicNil(err)180 specPod := buf.String()181 return decodeYAML(specPod, &output)182}183func GetDeployName(user string, model string, tag string, commit string) string {184 user = strings.Replace(user, "-", "--", -1)185 model = strings.Replace(model, "-", "--", -1)186 model = strings.Replace(model, ".", "-1", -1)187 // convert upper case characters to lower case with "-" prefix.188 for ch := 'a'; ch <= 'z'; ch++ {189 model = strings.Replace(model, string(ch+'A'-'a'), "-"+string(ch), -1)190 }191 tag = strings.Replace(tag, ".", "-1", -1)192 return "deploy-" + user + "-" + model + "-" + tag + "-" + commit[:5]193}194func GetJobName(user string, repo string, commit string) string {195 jobId := models.JobId(user, repo, commit)196 return "job-" + jobId197}198func CreateDeployV1(client *kube.Clientset, name string, image string, replica int) (string, error) {199 // Instantiate templates.200 args := struct {201 Name string202 Replica int203 Image string204 }{205 name,206 replica,207 image,208 }209 var deployment v1beta1.Deployment210 err := SpecFromTemplate(templateDeployV1, args, &deployment)211 if err != nil {212 return "", err213 }214 fmt.Println(deployment)215 // Create deployment.216 deploymentClient := client.AppsV1beta1().Deployments(kubeNamespace)217 result, err := deploymentClient.Create(&deployment)218 // Extract results.219 if err != nil {220 return "", err221 }222 return result.GetObjectMeta().GetName(), nil223}224func CreateDeployV2HTTP(client *kube.Clientset, user string, name string, tag string, commit string, config map[string]interface{}, replica int) (string, error) {225 // Get basic properties.226 image := config["image"].(string)227 resources := config["resources"].(map[string]interface{})228 workPath := config["work_path"].(string)229 // Create git worktree for the container.230 err := CreateRepoMirror(user, name, commit)231 if err != nil {232 fmt.Println("Error: " + err.Error())233 }234 // Command to run in container.235 var command []string236 // Add code root. This is where the code repo sits.237 command = append(command, "--code_root")238 command = append(command, GetRepoMirrorPath(user, name, commit))239 // Add asset root. This is where data / model weights sit.240 command = append(command, "--asset_root")241 command = append(command, "/mnt/cloudfs")242 // Add working path.243 command = append(command, "--work_path")244 command = append(command, workPath)245 // Add assets246 if config["assets"] != nil {247 assetInterfaces := config["assets"].([]interface{})248 if len(assetInterfaces) > 0 {249 command = append(command, "--assets")250 for _, asset := range assetInterfaces {251 command = append(command, asset.(string))252 }253 }254 }255 // Add setup commands.256 command = append(command, "--cmd")257 if config["setup"] != nil {258 for _, line := range config["setup"].([]interface{}) {259 command = append(command, line.(string))260 }261 }262 // Add entrypoint command.263 command = append(command, config["main"].(map[string]interface{})["entrypoint"].(string))264 fmt.Println("command", command)265 // Basic derived properties for deployment.266 // TODO: use a better separator.267 deployName := GetDeployName(user, name, tag, commit)268 args := struct {269 Name string270 Replica int271 Image string272 AssetRoot string273 }{274 deployName,275 replica,276 image,277 GetAssetPath(user, name, commit),278 }279 var deployment v1beta1.Deployment280 err = SpecFromTemplate(templateDeployV2, args, &deployment)281 if err != nil {282 return "", err283 }284 // https://godoc.org/k8s.io/api/core/v1#Container285 container := deployment.Spec.Template.Spec.Containers[0]286 container.Command = []string{"moxel-http-driver"}287 container.Args = command288 fmt.Println(deployment)289 // Set up resource specs.290 container.Resources.Requests = make(v1.ResourceList)291 container.Resources.Limits = make(v1.ResourceList)292 if cpu, ok := resources["cpu"]; ok {293 container.Resources.Requests["cpu"] = resource.MustParse(fmt.Sprintf("%v", cpu))294 }295 if memory, ok := resources["memory"]; ok {296 container.Resources.Requests["memory"] = resource.MustParse(fmt.Sprintf("%v", memory))297 }298 if gpu, ok := resources["gpu"]; ok {299 container.Resources.Limits["alpha.kubernetes.io/nvidia-gpu"] = resource.MustParse(fmt.Sprintf("%v", gpu))300 }301 deployment.Spec.Template.Spec.Containers[0] = container // Update container spec.302 // Create deployment.303 deploymentClient := client.AppsV1beta1().Deployments(kubeNamespace)304 result, err := deploymentClient.Create(&deployment)305 // Extract results.306 if err != nil {307 return "", err308 }309 return result.GetObjectMeta().GetName(), nil310}311// Deployment spec: https://godoc.org/k8s.io/api/apps/v1beta1#DeploymentSpec312func CreateDeployV2Python(client *kube.Clientset, user string, name string, tag string, commit string, config map[string]interface{}, replica int) (string, error) {313 // Get basic properties.314 image := config["image"].(string)315 resources := config["resources"].(map[string]interface{})316 var envs map[string]interface{}317 if config["envs"] != nil {318 envs = config["envs"].(map[string]interface{})319 }320 workPath := config["work_path"].(string)321 // Create git worktree for the container.322 err := CreateRepoMirror(user, name, commit)323 if err != nil {324 return "", err325 }326 // Command to run in container.327 params := make(map[string]interface{})328 params["code_root"] = GetRepoMirrorPath(user, name, commit)329 params["asset_root"] = "/mnt/cloudfs"330 params["work_path"] = workPath331 params["assets"] = config["assets"]332 params["input_space"] = config["input_space"]333 params["output_space"] = config["output_space"]334 if config["setup"] != nil {335 params["setup"] = config["setup"]336 }337 params["entrypoint"] = config["main"].(map[string]interface{})["entrypoint"].(string)338 paramsJSON, err := json.Marshal(params)339 if err != nil {340 return "", err341 }342 // Basic derived properties for deployment.343 // TODO: use a better separator.344 deployName := GetDeployName(user, name, tag, commit)345 args := struct {346 Name string347 Replica int348 Image string349 AssetRoot string350 }{351 deployName,352 replica,353 image,354 GetAssetPath(user, name, commit),355 }356 var deployment v1beta1.Deployment357 err = SpecFromTemplate(templateDeployV2, args, &deployment)358 if err != nil {359 return "", err360 }361 // https://godoc.org/k8s.io/api/core/v1#Container362 container := deployment.Spec.Template.Spec.Containers[0]363 container.Command = []string{"moxel-python-driver"}364 container.Args = []string{"--json", string(paramsJSON)}365 container.Env = []v1.EnvVar{}366 for k, v := range envs {367 container.Env = append(container.Env, v1.EnvVar{368 Name: k,369 Value: v.(string),370 })371 fmt.Println("container.Env", container.Env)372 }373 fmt.Println(deployment)374 // Set up resource specs.375 container.Resources.Requests = make(v1.ResourceList)376 container.Resources.Limits = make(v1.ResourceList)377 if cpu, ok := resources["cpu"]; ok {378 container.Resources.Requests["cpu"] = resource.MustParse(fmt.Sprintf("%v", cpu))379 }380 if memory, ok := resources["memory"]; ok {381 container.Resources.Requests["memory"] = resource.MustParse(fmt.Sprintf("%v", memory))382 }383 if gpu, ok := resources["gpu"]; ok {384 container.Resources.Limits["alpha.kubernetes.io/nvidia-gpu"] = resource.MustParse(fmt.Sprintf("%v", gpu))385 container.Env = append(container.Env, v1.EnvVar{386 Name: "LD_LIBRARY_PATH",387 Value: "/usr/local/nvidia/lib64",388 })389 container.Env = append(container.Env, v1.EnvVar{390 Name: "PATH",391 Value: "/opt/conda/bin:/opt/caffe/build/tools:/opt/caffe/python:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/nvidia/bin",392 })393 }394 deployment.Spec.Template.Spec.Containers[0] = container // Update container spec.395 // Create deployment.396 deploymentClient := client.AppsV1beta1().Deployments(kubeNamespace)397 result, err := deploymentClient.Create(&deployment)398 // Extract results.399 if err != nil {400 return "", err401 }402 return result.GetObjectMeta().GetName(), nil403}404// Return (deployName, error)405func CreateDeployV2(client *kube.Clientset, user string, name string, tag string, commit string, yamlString string, replica int) (string, error) {406 var config map[string]interface{}407 decodeYAML(yamlString, &config)408 if config["main"] == nil {409 return "", errors.New("Model YAML must have \"main\"")410 }411 main := config["main"].(map[string]interface{})412 if main["type"] == nil {413 return "", errors.New("Model YAML main entrance must have \"type\"")414 }415 mainType := main["type"].(string)416 if mainType == "http" {417 return CreateDeployV2HTTP(client, user, name, tag, commit, config, replica)418 } else if mainType == "python" {419 return CreateDeployV2Python(client, user, name, tag, commit, config, replica)420 } else {421 return "", errors.New(fmt.Sprintf("Unknown entrance type %s", mainType))422 }423}424// TODO: Refractor needed. Code duplication with CreateDeployV2.425func CreateJobV1(client *kube.Clientset, commit string, yamlString string) (string, error) {426 // Load YAML configuration.427 var err error428 var config map[string]interface{}429 decodeYAML(yamlString, &config)430 // Get basic properties.431 user := config["user"].(string)432 repo := config["repo"].(string)433 image := config["image"].(string)434 workPath := config["work_path"].(string)435 // name := config["name"].(string)436 // tag := config["tag"].(string)437 // Create git worktree for the container.438 err = CreateRepoMirror(user, repo, commit)439 if err != nil {440 fmt.Println("Error: " + err.Error())441 }442 // Command to run in container.443 var command []string444 // Add code root. This is where the code repo sits.445 command = append(command, "--code_root")446 command = append(command, GetRepoMirrorPath(user, repo, commit))447 // Add asset root. This is where data / model weights sit.448 command = append(command, "--asset_root")449 command = append(command, GetAssetPath(user, repo, commit))450 // Add working path.451 command = append(command, "--work_path")452 command = append(command, workPath)453 // Add assets454 assetsInterface := config["assets"]455 if assetsInterface != nil && len(assetsInterface.([]interface{})) > 0 {456 command = append(command, "--assets")457 for _, asset := range assetsInterface.([]interface{}) {458 command = append(command, asset.(string))459 }460 }461 // Add command.462 commandInterface := config["cmd"]463 command = append(command, "--cmd")464 var cmd string465 for _, line := range commandInterface.([]interface{}) {466 cmd += line.(string) + " ; "467 }468 command = append(command, cmd)469 fmt.Println("command", command)470 jobName := GetJobName(user, repo, commit)471 args := struct {472 Name string473 Image string474 }{475 jobName,476 image,477 }478 var job batchv1.Job479 err = SpecFromTemplate(templateJobV1, args, &job)480 if err != nil {481 return "", err482 }483 job.Spec.Template.Spec.Containers[0].Args = command484 fmt.Println(job.Spec.Template.Spec)485 // Create job.486 jobClient := client.BatchV1().Jobs(kubeNamespace)487 result, err := jobClient.Create(&job)488 fmt.Println(result)489 fmt.Println(err)490 // Extract results.491 if err != nil {492 return "", err493 }494 return result.GetObjectMeta().GetName(), nil495}496// Get the Pod based on the given job.497func GetPodsByJobName(client *kube.Clientset, jobName string) ([]v1.Pod, error) {498 podClient := client.CoreV1().Pods(kubeNamespace)499 var options = metav1.ListOptions{500 LabelSelector: fmt.Sprintf("job-name=%s", jobName),501 }502 pods, err := podClient.List(options)503 return pods.Items, err504}505// Get the Pod based on the given deployment.506func GetPodsByDeployName(client *kube.Clientset, deployName string) ([]v1.Pod, error) {507 podClient := client.CoreV1().Pods(kubeNamespace)508 var options = metav1.ListOptions{509 LabelSelector: fmt.Sprintf("app=%s", deployName),510 }511 pods, err := podClient.List(options)512 return pods.Items, err513}514// Stream logs from a pod.515// Reference implementation: https://github.com/kubernetes/kubernetes/blob/c2e90cd1549dff87db7941544ce15f4c8ad0ba4c/pkg/kubectl/cmd/log.go#L186516func StreamLogsFromPod(client *kube.Clientset, podID string, follow bool, out io.Writer) error {517 logOptions := v1.PodLogOptions{518 Container: "main",519 Follow: follow,520 Previous: false,521 Timestamps: true,522 }523 req := client.CoreV1().RESTClient().Get().524 Namespace(kubeNamespace).525 Name(podID).526 Resource("pods").527 SubResource("log").528 Param("follow", strconv.FormatBool(logOptions.Follow)).529 Param("container", logOptions.Container).530 Param("previous", strconv.FormatBool(logOptions.Previous)).531 Param("timestamps", strconv.FormatBool(logOptions.Timestamps))532 if logOptions.SinceSeconds != nil {533 req.Param("sinceSeconds", strconv.FormatInt(*logOptions.SinceSeconds, 10))534 }535 if logOptions.SinceTime != nil {536 req.Param("sinceTime", logOptions.SinceTime.Format(time.RFC3339))537 }538 if logOptions.LimitBytes != nil {539 req.Param("limitBytes", strconv.FormatInt(*logOptions.LimitBytes, 10))540 }541 if logOptions.TailLines != nil {542 req.Param("tailLines", strconv.FormatInt(*logOptions.TailLines, 10))543 }544 readCloser, err := req.Stream()545 if err != nil {546 return err547 }548 defer readCloser.Close()549 // _, err = io.Copy(os.Stdout, readCloser)550 _, err = io.CopyBuffer(out, readCloser, make([]byte, 1))551 return err552}553// Stream logs from a job.554func StreamLogsFromJob(client *kube.Clientset, jobName string, follow bool, out io.Writer) error {555 pods, err := GetPodsByJobName(client, jobName)556 if err != nil {557 return err558 }559 if len(pods) == 0 {560 return errors.New(fmt.Sprintf("Cannot find job with given name: %s", jobName))561 }562 podId := pods[0].GetObjectMeta().GetName()563 return StreamLogsFromPod(client, podId, follow, out)564}565func StreamLogsFromModel(client *kube.Clientset, userId string, modelName string, tag string, commit string, follow bool, out io.Writer) error {566 deployName := GetDeployName(userId, modelName, tag, commit)567 pods, err := GetPodsByDeployName(client, deployName)568 if err != nil {569 return err570 }571 if len(pods) == 0 {572 return errors.New(fmt.Sprintf("Cannot find job with given name: %s", deployName))573 }574 podId := pods[0].GetObjectMeta().GetName()575 fmt.Println("Log from pod", podId)576 return StreamLogsFromPod(client, podId, follow, out)577}578func CreateService(client *kube.Clientset, deployName string) error {579 serviceSpec := &v1.Service{580 TypeMeta: metav1.TypeMeta{581 Kind: "Service",582 APIVersion: "v1beta1",583 },584 ObjectMeta: metav1.ObjectMeta{585 Name: deployName,586 },587 Spec: v1.ServiceSpec{588 Type: v1.ServiceTypeNodePort,589 Selector: map[string]string{"app": deployName},590 Ports: []v1.ServicePort{591 v1.ServicePort{592 Protocol: v1.ProtocolTCP,593 Port: 5900,594 TargetPort: intstr.IntOrString{595 Type: intstr.Int,596 IntVal: 5900,597 },598 },599 },600 },601 }602 services := client.CoreV1().Services(kubeNamespace)603 service, err := services.Get(deployName, metav1.GetOptions{})604 switch {605 case err == nil:606 serviceSpec.ObjectMeta.ResourceVersion = service.ObjectMeta.ResourceVersion607 serviceSpec.Spec.ClusterIP = service.Spec.ClusterIP608 _, err = services.Update(serviceSpec)609 case k8sErrors.IsNotFound(err):610 _, err = services.Create(serviceSpec)611 }612 return err613}614func AddServiceToIngress(client *kube.Clientset, path string, serviceName string) error {615 ingresses := client.ExtensionsV1beta1().Ingresses(kubeNamespace)616 ingress, err := ingresses.Get(ingressName, metav1.GetOptions{})617 modelBackend := &v1beta1Extensions.IngressBackend{618 ServiceName: serviceName,619 ServicePort: intstr.IntOrString{620 Type: intstr.Int,621 IntVal: 5900,622 },623 }624 if k8sErrors.IsNotFound(err) {625 defaultBackend := &v1beta1Extensions.IngressBackend{626 ServiceName: "default-http-backend",627 ServicePort: intstr.IntOrString{628 Type: intstr.Int,629 IntVal: 80,630 },631 }632 ingressSpec := &v1beta1Extensions.Ingress{633 TypeMeta: metav1.TypeMeta{634 Kind: "Ingress",635 APIVersion: "v1beta1",636 },637 ObjectMeta: metav1.ObjectMeta{638 Name: ingressName,639 Annotations: map[string]string{640 "ingress.kubernetes.io/ssl-redirect": "false",641 "kubernetes.io/ingress.class": "nginx",642 "ingress.kubernetes.io/rewrite-target": "/",643 "kubernetes.io/ingress.global-static-ip-name": "dummy-ingress",644 "ingress.kubernetes.io/proxy-body-size": "500m",645 "nginx/client_max_body_size": "500m",646 },647 },648 Spec: v1beta1Extensions.IngressSpec{649 Backend: defaultBackend,650 Rules: []v1beta1Extensions.IngressRule{651 v1beta1Extensions.IngressRule{652 IngressRuleValue: v1beta1Extensions.IngressRuleValue{653 HTTP: &v1beta1Extensions.HTTPIngressRuleValue{654 Paths: []v1beta1Extensions.HTTPIngressPath{655 v1beta1Extensions.HTTPIngressPath{656 Path: path,657 Backend: *modelBackend,658 },659 },660 },661 },662 },663 },664 },665 }666 _, err = ingresses.Create(ingressSpec)667 } else if err == nil {668 // check if the path already exists.669 paths := []v1beta1Extensions.HTTPIngressPath{}670 if len(ingress.Spec.Rules) > 0 {671 paths = ingress.Spec.Rules[0].IngressRuleValue.HTTP.Paths672 }673 for _, rule := range paths {674 if rule.Path == path {675 return fmt.Errorf("Ingress rule already exists for path %s", path)676 }677 }678 newPaths := append(paths,679 v1beta1Extensions.HTTPIngressPath{680 Path: path,681 Backend: *modelBackend,682 },683 )684 if len(ingress.Spec.Rules) == 0 {685 ingress.Spec.Rules = []v1beta1Extensions.IngressRule{686 v1beta1Extensions.IngressRule{687 IngressRuleValue: v1beta1Extensions.IngressRuleValue{688 HTTP: &v1beta1Extensions.HTTPIngressRuleValue{689 Paths: newPaths,690 },691 },692 },693 }694 } else {695 ingress.Spec.Rules[0].IngressRuleValue.HTTP.Paths = newPaths696 }697 _, err = ingresses.Update(ingress)698 }699 return err700}701func RemoveServiceFromIngress(client *kube.Clientset, path string) error {702 ingresses := client.ExtensionsV1beta1().Ingresses(kubeNamespace)703 ingress, err := ingresses.Get(ingressName, metav1.GetOptions{})704 if err == nil {705 paths := []v1beta1Extensions.HTTPIngressPath{}706 if len(ingress.Spec.Rules) == 0 {707 return errors.New("There is zero ingress rules. Cannot delete")708 }709 paths = ingress.Spec.Rules[0].IngressRuleValue.HTTP.Paths710 index := -1711 for i, rule := range paths {712 if rule.Path == path {713 index = i714 break715 }716 }717 if index == -1 {718 return errors.New(fmt.Sprintf("Path %s not found in ingress", path))719 }720 paths = append(paths[:index], paths[index+1:]...)721 fmt.Println("paths", paths)722 if len(paths) == 0 {723 ingress.Spec.Rules = []v1beta1Extensions.IngressRule{}724 } else {725 ingress.Spec.Rules[0].IngressRuleValue.HTTP.Paths = paths726 }727 _, err = ingresses.Update(ingress)728 }729 return err730}731func ListDeploy(client *kube.Clientset) ([]string, error) {732 var names []string733 deploymentClient := client.AppsV1beta1().Deployments(kubeNamespace)734 list, err := deploymentClient.List(metav1.ListOptions{LabelSelector: "app=dummy"})735 if err != nil {736 return names, err737 }738 for _, d := range list.Items {739 names = append(names, d.Name)740 }741 return names, nil742}743func TeardownDeploy(client *kube.Clientset, name string) error {744 deploymentClient := client.AppsV1beta1().Deployments(kubeNamespace)745 deletePolicy := metav1.DeletePropagationForeground746 err := deploymentClient.Delete(name,747 &metav1.DeleteOptions{748 PropagationPolicy: &deletePolicy,749 })750 return err751}752func TeardownService(client *kube.Clientset, name string) error {753 services := client.CoreV1().Services(kubeNamespace)754 err := services.Delete(name, &metav1.DeleteOptions{})755 return err756}...
executions.go
Source:executions.go
...277 w.Flush()278 }279 return280 }281 s.streamLogsFromJob(executionID, w)282 }))283 return nil284 }285}286// GetExecutionHandler returns test execution object for given test and execution id/name287func (s TestkubeAPI) GetExecutionHandler() fiber.Handler {288 return func(c *fiber.Ctx) error {289 ctx := c.Context()290 id := c.Params("id", "")291 executionID := c.Params("executionID")292 var execution testkube.Execution293 var err error294 if id == "" {295 execution, err = s.ExecutionResults.Get(ctx, executionID)296 if err == mongo.ErrNoDocuments {297 execution, err = s.ExecutionResults.GetByName(ctx, executionID)298 if err == mongo.ErrNoDocuments {299 return s.Error(c, http.StatusNotFound, fmt.Errorf("test with execution id/name %s not found", executionID))300 }301 }302 if err != nil {303 return s.Error(c, http.StatusInternalServerError, err)304 }305 } else {306 execution, err = s.ExecutionResults.GetByNameAndTest(ctx, executionID, id)307 if err == mongo.ErrNoDocuments {308 return s.Error(c, http.StatusNotFound, fmt.Errorf("test %s/%s not found", id, executionID))309 }310 if err != nil {311 return s.Error(c, http.StatusInternalServerError, err)312 }313 }314 execution.Duration = types.FormatDuration(execution.Duration)315 testSecretMap := make(map[string]string)316 if execution.TestSecretUUID != "" {317 testSecretMap, err = s.TestsClient.GetSecretTestVars(execution.TestName, execution.TestSecretUUID)318 if err != nil {319 return s.Error(c, http.StatusInternalServerError, err)320 }321 }322 testSuiteSecretMap := make(map[string]string)323 if execution.TestSuiteSecretUUID != "" {324 testSuiteSecretMap, err = s.TestsSuitesClient.GetSecretTestSuiteVars(execution.TestSuiteName, execution.TestSuiteSecretUUID)325 if err != nil {326 return s.Error(c, http.StatusInternalServerError, err)327 }328 }329 for key, value := range testSuiteSecretMap {330 testSecretMap[key] = value331 }332 for key, value := range testSecretMap {333 if variable, ok := execution.Variables[key]; ok && value != "" {334 variable.Value = string(value)335 variable.SecretRef = nil336 execution.Variables[key] = variable337 }338 }339 s.Log.Debugw("get test execution request - debug", "execution", execution)340 return c.JSON(execution)341 }342}343func (s TestkubeAPI) AbortExecutionHandler() fiber.Handler {344 return func(c *fiber.Ctx) error {345 ctx := c.Context()346 executionID := c.Params("executionID")347 execution, err := s.ExecutionResults.Get(ctx, executionID)348 if err == mongo.ErrNoDocuments {349 return s.Error(c, http.StatusNotFound, fmt.Errorf("test with execution id %s not found", executionID))350 }351 if err != nil {352 return s.Error(c, http.StatusInternalServerError, err)353 }354 result := s.Executor.Abort(executionID)355 s.Metrics.IncAbortTest(execution.TestType, result.IsFailed())356 return err357 }358}359func (s TestkubeAPI) GetArtifactHandler() fiber.Handler {360 return func(c *fiber.Ctx) error {361 executionID := c.Params("executionID")362 fileName := c.Params("filename")363 // TODO fix this someday :) we don't know 15 mins before release why it's working this way364 // remember about CLI client and Dashboard client too!365 unescaped, err := url.QueryUnescape(fileName)366 if err == nil {367 fileName = unescaped368 }369 unescaped, err = url.QueryUnescape(fileName)370 if err == nil {371 fileName = unescaped372 }373 //// quickfix end374 file, err := s.Storage.DownloadFile(executionID, fileName)375 if err != nil {376 return s.Error(c, http.StatusInternalServerError, err)377 }378 defer file.Close()379 return c.SendStream(file)380 }381}382// GetArtifacts returns list of files in the given bucket383func (s TestkubeAPI) ListArtifactsHandler() fiber.Handler {384 return func(c *fiber.Ctx) error {385 executionID := c.Params("executionID")386 files, err := s.Storage.ListFiles(executionID)387 if err != nil {388 return s.Error(c, http.StatusInternalServerError, err)389 }390 return c.JSON(files)391 }392}393func (s TestkubeAPI) GetExecuteOptions(namespace, id string, request testkube.ExecutionRequest) (options client.ExecuteOptions, err error) {394 // get test content from kubernetes CRs395 testCR, err := s.TestsClient.Get(id)396 if err != nil {397 return options, fmt.Errorf("can't get test custom resource %w", err)398 }399 test := testsmapper.MapTestCRToAPI(*testCR)400 if test.ExecutionRequest != nil {401 // Test variables lowest priority, then test suite, then test suite execution / test execution402 request.Variables = mergeVariables(test.ExecutionRequest.Variables, request.Variables)403 // Combine test executor args with execution args404 request.Args = append(request.Args, test.ExecutionRequest.Args...)405 request.Envs = mergeEnvs(request.Envs, test.ExecutionRequest.Envs)406 request.SecretEnvs = mergeEnvs(request.SecretEnvs, test.ExecutionRequest.SecretEnvs)407 if request.VariablesFile == "" && test.ExecutionRequest.VariablesFile != "" {408 request.VariablesFile = test.ExecutionRequest.VariablesFile409 }410 if request.HttpProxy == "" && test.ExecutionRequest.HttpProxy != "" {411 request.HttpProxy = test.ExecutionRequest.HttpProxy412 }413 if request.HttpsProxy == "" && test.ExecutionRequest.HttpsProxy != "" {414 request.HttpsProxy = test.ExecutionRequest.HttpsProxy415 }416 }417 // get executor from kubernetes CRs418 executorCR, err := s.ExecutorsClient.GetByType(testCR.Spec.Type_)419 if err != nil {420 return options, fmt.Errorf("can't get executor spec: %w", err)421 }422 var usernameSecret, tokenSecret *testkube.SecretRef423 if test.Content != nil && test.Content.Repository != nil {424 usernameSecret = test.Content.Repository.UsernameSecret425 tokenSecret = test.Content.Repository.TokenSecret426 }427 return client.ExecuteOptions{428 TestName: id,429 Namespace: namespace,430 TestSpec: testCR.Spec,431 ExecutorName: executorCR.ObjectMeta.Name,432 ExecutorSpec: executorCR.Spec,433 Request: request,434 Sync: request.Sync,435 Labels: testCR.Labels,436 UsernameSecret: usernameSecret,437 TokenSecret: tokenSecret,438 ImageOverride: request.Image,439 }, nil440}441// streamLogsFromResult writes logs from the output of executionResult to the writer442func (s *TestkubeAPI) streamLogsFromResult(executionResult *testkube.ExecutionResult, w *bufio.Writer) error {443 enc := json.NewEncoder(w)444 fmt.Fprintf(w, "data: ")445 s.Log.Debug("using logs from result")446 output := testkube.ExecutorOutput{447 Type_: output.TypeResult,448 Content: executionResult.Output,449 Result: executionResult,450 }451 err := enc.Encode(output)452 if err != nil {453 s.Log.Infow("Encode", "error", err)454 return err455 }456 fmt.Fprintf(w, "\n")457 w.Flush()458 return nil459}460// streamLogsFromJob streams logs in chunks to writer from the running execution461func (s *TestkubeAPI) streamLogsFromJob(executionID string, w *bufio.Writer) {462 enc := json.NewEncoder(w)463 s.Log.Debug("getting logs from Kubernetes job")464 logs, err := s.Executor.Logs(executionID)465 s.Log.Debugw("waiting for jobs channel", "channelSize", len(logs))466 if err != nil {467 output.PrintError(os.Stdout, err)468 s.Log.Errorw("getting logs error", "error", err)469 w.Flush()470 return471 }472 // loop through pods log lines - it's blocking channel473 // and pass single log output as sse data chunk474 for out := range logs {475 s.Log.Debugw("got log", "out", out)...
streamLogsFromJob
Using AI Code Generation
1import (2func main() {3 config, err := rest.InClusterConfig()4 if err != nil {5 panic(err.Error())6 }7 clientset, err := kubernetes.NewForConfig(config)8 if err != nil {9 panic(err.Error())10 }11 job := &batchv1.Job{12 ObjectMeta: metav1.ObjectMeta{13 },14 Spec: batchv1.JobSpec{15 Template: corev1.PodTemplateSpec{16 Spec: corev1.PodSpec{17 Containers: []corev1.Container{18 {19 Command: []string{"perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"},20 },21 },22 },23 },24 },25 }26 job, err = clientset.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})27 if err != nil {28 panic(err.Error())29 }30 fmt.Printf("Job created31 watch, err := clientset.BatchV1().Jobs("default").Watch(context.Background(), metav1.ListOptions{Watch: true})32 if err != nil {33 panic(err.Error())34 }35 ch := watch.ResultChan()36 for event := range ch {37 if event.Type == watch.Modified {38 job, ok := event.Object.(*batchv1.Job)39 if !ok {40 log.Fatal("Cannot convert to *batchv1.Job")41 }42 if job.Status.Succeeded > 0 {43 fmt.Printf("Job completed44 }45 }46 }
streamLogsFromJob
Using AI Code Generation
1import (2func main() {3 config, err := clientcmd.BuildConfigFromFlags("", "path/to/kubeconfig")4 if err != nil {5 config, err = rest.InClusterConfig()6 if err != nil {7 panic(err.Error())8 }9 }10 clientset, err := kubernetes.NewForConfig(config)11 if err != nil {12 panic(err.Error())13 }14 namespace, _, err := clientcmd.NewDefaultClientConfigLoadingRules().Load()15 if err != nil {16 panic(err.Error())17 }18 clientset, err = kubernetes.NewForConfig(config)19 if err != nil {20 panic(err.Error())21 }22 clientset, err = kubernetes.NewForConfig(config)23 if err != nil {24 panic(err.Error())25 }26 v1Clientset, err := clientset.CoreV1()27 if err != nil {28 panic(err.Error())29 }30 batchClientset, err := clientset.BatchV1()31 if err != nil {32 panic(err.Error())33 }34 deployment := &v1beta1.Deployment{35 ObjectMeta: v1.ObjectMeta{36 },37 Spec: v1beta1.DeploymentSpec{
streamLogsFromJob
Using AI Code Generation
1import (2func main() {3 config, err := clientcmd.BuildConfigFromFlags("", "/home/user/.kube/config")4 if err != nil {5 panic(err.Error())6 }7 clientset, err := kubernetes.NewForConfig(config)8 if err != nil {9 panic(err.Error())10 }11 job, err := clientset.BatchV1().Jobs("default").Get(context.TODO(), "job1", v1.GetOptions{})12 if err != nil {13 panic(err.Error())14 }15 pod, err := clientset.CoreV1().Pods("default").Get(context.TODO(), podName, v1.GetOptions{})16 if err != nil {17 panic(err.Error())18 }19 logs, err := clientset.CoreV1().Pods("default").GetLogs(pod.Name, &v1.PodLogOptions{}).Stream(context.TODO())20 if err != nil {21 panic(err.Error())22 }23 defer logs.Close()24 buf := new(bytes.Buffer)25 buf.ReadFrom(logs)26 fmt.Println(buf.String())27}
streamLogsFromJob
Using AI Code Generation
1import (2func main() {3 config, err := clientcmd.BuildConfigFromFlags("", "/home/ubuntu/.kube/config")4 if err != nil {5 log.Fatal(err)6 }7 clientset, err := kubernetes.NewForConfig(config)8 if err != nil {9 log.Fatal(err)10 }11 podList, err := clientset.CoreV1().Pods("default").List(metav1.ListOptions{})12 if err != nil {13 panic(err.Error())14 }15 for _, i := range podList.Items {16 fmt.Printf("Pod Name: %s17 fmt.Printf("Pod Status: %s18 fmt.Printf("Pod IP: %s19 fmt.Printf("Pod HostIP: %s20 fmt.Printf("Pod StartTime: %s21 fmt.Printf("Pod CreationTime: %s22 fmt.Printf("Pod RestartCount: %s23 fmt.Printf("Pod Ready: %s24 fmt.Printf("Pod Image: %s25 fmt.Printf("Pod ImageID: %s26 fmt.Printf("Pod State: %s27 fmt.Printf("Pod LastState: %s28 fmt.Printf("Pod Reason: %s29 fmt.Printf("Pod StartedAt: %s30 }31}
streamLogsFromJob
Using AI Code Generation
1import (2func main() {3 if os.Getenv("KUBECONFIG") != "" {4 config, err = clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))5 } else {6 config, err = rest.InClusterConfig()7 }8 if err != nil {9 log.Fatalf("Error building kubeconfig: %s", err.Error())10 }11 clientset, err := kubernetes.NewForConfig(config)12 if err != nil {13 log.Fatalf("Error building kubernetes clientset: %s", err.Error())14 }15 streamLogsFromJob(clientset, jobName, namespace)16}17func streamLogsFromJob(clientset *kubernetes.Clientset, jobName string, namespace string) {18 podName, err := getPodNameForJob(clientset, jobName, namespace)19 if err != nil {20 log.Fatalf("Error getting pod name for job %s: %s", jobName, err.Error())21 }22 streamLogsFromPod(clientset, podName, namespace)23}24func getPodNameForJob(clientset *kubernetes.Clientset, jobName string, namespace string) (string, error) {25 job, err := clientset.BatchV1().Jobs(namespace).Get(context.Background(), jobName, metav1.GetOptions{})26 if err != nil {27 }28}29func streamLogsFromPod(clientset *kubernetes.Clientset, podName string, namespace string) {30 pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})31 if err != nil {32 log.Fatalf("Error getting pod %s: %s", podName, err.Error())33 }
streamLogsFromJob
Using AI Code Generation
1import (2func main() {3 kubeconfig = flag.String("kubeconfig", "/home/username/.kube/config", "absolute path to the kubeconfig file")4 config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)5 if err != nil {6 panic(err.Error())7 }8 clientset, err := kubernetes.NewForConfig(config)9 if err != nil {10 panic(err.Error())11 }12 podClient := clientset.CoreV1().Pods("default")13 pod, err := podClient.Get("podname", meta_v1.GetOptions{})14 if err != nil {15 panic(err.Error())16 }17 streamLogsFromJob(clientset, pod)18}19func streamLogsFromJob(clientset *kubernetes.Clientset, pod *v1.Pod) {20 req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{})21 stream, err := req.Stream()22 if err != nil {23 panic(err.Error())24 }25 defer stream.Close()26 io.Copy(os.Stdout, stream)27}
streamLogsFromJob
Using AI Code Generation
1import (2func streamLogsFromJob(ctx context.Context, job *v1.DeploymentConfig, config *rest.Config) error {3 client, err := v1.NewForConfig(config)4 if err != nil {5 }6 fmt.Println("Streaming logs from job...")7 logs, err := client.DeploymentConfigs("test").Watch(metav1.ListOptions{})8 if err != nil {9 }10 for {11 select {12 case <-ctx.Done():13 case event := <-logs.ResultChan():14 fmt.Println(event.Object)15 }16 }17}18func main() {19 kubeconfig := flag.String("kubeconfig", "", "absolute path to the kubeconfig file")20 flag.Parse()21 config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)22 if err != nil {23 log.Fatal(err)24 }25 clientset, err := versioned.NewForConfig(config)26 if err != nil {27 log.Fatal(err)28 }
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!