Best Testkube code snippet using client.TailPodLogs
manager.go
Source:manager.go
1package kubelogs2import (3 "bufio"4 "context"5 "errors"6 "fmt"7 "strings"8 "sync"9 "time"10 v1 "k8s.io/api/core/v1"11 apierrors "k8s.io/apimachinery/pkg/api/errors"12 "k8s.io/apimachinery/pkg/api/meta"13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"14 "k8s.io/client-go/informers"15 "k8s.io/client-go/kubernetes"16 listerv1 "k8s.io/client-go/listers/core/v1"17 "k8s.io/client-go/tools/cache"18 "github.com/ripta/axe/pkg/logger"19)20var ErrInformerNeverSynced = errors.New("informer cache never completed syncing")21type Manager struct {22 kubernetes.Interface23 debug bool24 l logger.Interface25 logCh chan logger.LogLine26 mu sync.Mutex27 nsCancelers map[string]context.CancelFunc28 nsInformers map[string]informers.SharedInformerFactory29 podLogCancelers map[string]context.CancelFunc30 containerTails map[string]bool31 lookback time.Duration32 resync time.Duration33}34func NewManager(l logger.Interface, cs kubernetes.Interface, lookback, resync time.Duration, debug bool) *Manager {35 if lookback > 0 {36 lookback = -lookback37 }38 if lookback == 0 {39 lookback = -5 * time.Minute40 }41 return &Manager{42 Interface: cs,43 debug: debug,44 l: l,45 mu: sync.Mutex{},46 logCh: make(chan logger.LogLine, 1000),47 containerTails: make(map[string]bool),48 nsCancelers: make(map[string]context.CancelFunc),49 nsInformers: make(map[string]informers.SharedInformerFactory),50 podLogCancelers: make(map[string]context.CancelFunc),51 lookback: lookback,52 resync: resync,53 }54}55func (m *Manager) ContainerCount() (int, int) {56 var active, all int57 m.mu.Lock()58 defer m.mu.Unlock()59 for _, up := range m.containerTails {60 all += 161 if up {62 active += 163 }64 }65 return active, all66}67func (m *Manager) Logs() <-chan logger.LogLine {68 return m.logCh69}70func (m *Manager) NamespaceCount() int {71 m.mu.Lock()72 defer m.mu.Unlock()73 return len(m.nsInformers)74}75func (m *Manager) Run(ctx context.Context) error {76 m.mu.Lock()77 defer m.mu.Unlock()78 for ns, inf := range m.nsInformers {79 ctx, cancel := context.WithCancel(ctx)80 m.nsCancelers[ns] = cancel81 inf.Start(ctx.Done())82 }83 return m.unsafeWaitForCacheSync(ctx.Done())84}85func (m *Manager) WaitForCacheSync(stopCh <-chan struct{}) error {86 m.mu.Lock()87 defer m.mu.Unlock()88 return m.unsafeWaitForCacheSync(stopCh)89}90func (m *Manager) unsafeWaitForCacheSync(stopCh <-chan struct{}) error {91 for ns, inf := range m.nsInformers {92 for typ, ok := range inf.WaitForCacheSync(stopCh) {93 if !ok {94 return fmt.Errorf("%w for type %s in namespace %s", ErrInformerNeverSynced, typ.String(), ns)95 }96 }97 m.l.Printf("cache synced for namespace %s", ns)98 }99 return nil100}101func (m *Manager) Watch(namespace string) {102 m.mu.Lock()103 defer m.mu.Unlock()104 if _, ok := m.nsInformers[namespace]; !ok {105 inf := informers.NewSharedInformerFactoryWithOptions(m.Interface, m.resync, informers.WithNamespace(namespace))106 inf.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{107 AddFunc: func(newobj interface{}) {108 om, err := meta.Accessor(newobj)109 if err != nil {110 m.l.Printf("could not retrieve meta information from new object during add: %+v", err)111 return112 }113 m.startPodLogs(om.GetNamespace(), om.GetName())114 },115 UpdateFunc: func(_, newobj interface{}) {116 om, err := meta.Accessor(newobj)117 if err != nil {118 m.l.Printf("could not retrieve meta information from new object during update: %+v", err)119 return120 }121 m.startPodLogs(om.GetNamespace(), om.GetName())122 },123 DeleteFunc: func(oldobj interface{}) {124 om, err := meta.Accessor(oldobj)125 if err != nil {126 m.l.Printf("could not retrieve meta information from old object during delete: %+v", err)127 return128 }129 m.stopPodLogs(om.GetNamespace(), om.GetName())130 },131 })132 m.l.Printf("registered watch for namespace %s", namespace)133 m.nsInformers[namespace] = inf134 }135}136func (m *Manager) Unwatch(namespace string) {137 m.mu.Lock()138 defer m.mu.Unlock()139 if _, ok := m.nsInformers[namespace]; ok {140 if cancel, ok := m.nsCancelers[namespace]; ok {141 cancel()142 }143 m.l.Printf("stopped watching namespace %s", namespace)144 delete(m.nsInformers, namespace)145 delete(m.nsCancelers, namespace)146 stops := make([]string, 0)147 for key := range m.podLogCancelers {148 if strings.HasPrefix(key, namespace+"/") {149 stops = append(stops, key)150 }151 }152 for _, key := range stops {153 if cancel, ok := m.podLogCancelers[key]; ok {154 cancel()155 }156 m.l.Printf("stopped tailing logs for %s", key)157 delete(m.podLogCancelers, key)158 }159 }160}161func (m *Manager) stopPodLogs(ns, name string) {162 m.mu.Lock()163 defer m.mu.Unlock()164 m.l.Printf("stopping pod logs for %s/%s", ns, name)165 key := fmt.Sprintf("%s/%s", ns, name)166 if cancel, ok := m.podLogCancelers[key]; ok {167 cancel()168 }169 delete(m.podLogCancelers, key)170}171func (m *Manager) startPodLogs(ns, name string) {172 m.mu.Lock()173 defer m.mu.Unlock()174 key := fmt.Sprintf("%s/%s", ns, name)175 if _, ok := m.podLogCancelers[key]; ok {176 return177 }178 ctx, cancel := context.WithCancel(context.Background())179 m.podLogCancelers[key] = cancel180 go m.tailPodLogs(ctx, ns, name)181}182func (m *Manager) tailPodLogs(ctx context.Context, ns, name string) {183 m.l.Printf("starting tail of logs for pod %s/%s", ns, name)184 defer m.stopPodLogs(ns, name)185 inf, ok := m.nsInformers[ns]186 if !ok {187 m.l.Printf("could not tail logs for %s/%s, because its namespace informer is missing", ns, name)188 return189 }190 pl := inf.Core().V1().Pods().Lister()191 pod, err := pl.Pods(ns).Get(name)192 if err != nil {193 if apierrors.IsNotFound(err) {194 m.l.Printf("ignoring deleted pod %s/%s", ns, name)195 return196 }197 }198 wg := sync.WaitGroup{}199 // TODO(ripta): handle init containers, which means we need to re-enter tailPodLogs200 for _, container := range pod.Spec.Containers {201 wg.Add(1)202 go m.tailPodContainerLogs(ctx, pl, ns, name, container.Name)203 }204 wg.Wait()205}206func (m *Manager) tailPodContainerLogs(ctx context.Context, pl listerv1.PodLister, ns, name, cn string) {207 key := fmt.Sprintf("%s/%s/%s", ns, name, cn)208 m.mu.Lock()209 m.containerTails[key] = true210 m.mu.Unlock()211 defer func() {212 m.mu.Lock()213 m.containerTails[key] = false214 m.mu.Unlock()215 }()216 m.l.Printf("starting tail of logs for container %s", key)217 plo := v1.PodLogOptions{218 Container: cn,219 Follow: true,220 SinceTime: &metav1.Time{221 Time: time.Now().Add(m.lookback),222 },223 }224 for {225 if _, err := pl.Pods(ns).Get(name); err != nil {226 if apierrors.IsTooManyRequests(err) {227 // TODO(ripta): add jitter228 time.Sleep(5 * time.Second)229 m.l.Printf("got throttled by apiserver while asking about container %s", key)230 continue231 }232 if apierrors.IsNotFound(err) {233 m.l.Printf("ignoring container %s belonging to deleted pod %s/%s", cn, ns, name)234 return235 }236 }237 req := m.Interface.CoreV1().Pods(ns).GetLogs(name, &plo)238 stream, err := req.Context(ctx).Stream()239 if err != nil {240 // TODO(ripta): add jitter241 time.Sleep(5 * time.Second)242 m.l.Printf("could not tail container %s: %+v", key, err)243 continue244 }245 defer stream.Close()246 // lag := time.NewTimer(time.Millisecond)247 // defer lag.Stop()248 m.l.Printf("streaming logs for container %s", key)249 m.logCh <- logger.LogLine{250 Type: logger.LogLineTypeAxe,251 Text: fmt.Sprintf("streaming logs for container %s", key),252 }253 scanner := bufio.NewScanner(stream)254 for scanner.Scan() {255 line := logger.LogLine{256 Type: logger.LogLineTypeContainer,257 Namespace: ns,258 Name: name,259 Text: scanner.Text(),260 }261 select {262 case m.logCh <- line:263 // case <-lag.C:264 // m.l.Printf("event buffer full, dropping logs for %s/%s", ns, name)265 case <-ctx.Done():266 m.l.Printf("stopped tailing container %s", key)267 return268 }269 }270 plo.SinceTime.Time = time.Now()271 m.l.Printf("end of logs for container %s", key)272 if err := scanner.Err(); err != nil {273 m.l.Printf("error tailing container %s: %+v", key, err)274 }275 // TODO(ripta): add jitter276 time.Sleep(5 * time.Second)277 }278}...
inspect.go
Source:inspect.go
...78 ui.Debug("pod", pod.GetNamespace()+"/"+pod.GetName(), "status", string(pod.Status.Phase))79 switch pod.Status.Phase {80 case corev1.PodRunning:81 logrus.Debug("tailing pod logs: immediately")82 return c.executor.TailPodLogs(ctx, pod, logs)83 case corev1.PodFailed:84 err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name)85 logrus.Error(err.Error())86 return c.GetLastLogLineError(ctx, pod)87 default:88 logrus.Debug("tailing job logs: waiting for pod to be ready")89 if err = wait.PollImmediate(pollInterval, pollTimeout, IsPodReady(ctx, c.client, client.ObjectKeyFromObject(&pod))); err != nil {90 ui.ExitOnError("poll immediate error when tailing logs", err)91 return c.GetLastLogLineError(ctx, pod)92 }93 logrus.Debug("tailing pod logs")94 return c.executor.TailPodLogs(ctx, pod, logs)95 }96 }97 }98 return99}100// GetLastLogLineError return error if last line is failed101func (c TestInspectionClient) GetLastLogLineError(ctx context.Context, pod corev1.Pod) error {102 log, err := c.GetPodLogError(ctx, pod)103 if err != nil {104 return fmt.Errorf("getPodLogs error: %w", err)105 }106 logrus.Debug("log", "got last log bytes", string(log)) // in case distorted log bytes107 entry, err := output.GetLogEntry(log)108 if err != nil {...
executor.go
Source:executor.go
...136 }137 return logs, nil138}139*/140func (e *Executor) TailPodLogs(ctx context.Context, pod corev1.Pod, logs chan []byte) (err error) {141 count := int64(1)142 var containers []string143 for _, container := range pod.Spec.InitContainers {144 containers = append(containers, container.Name)145 }146 for _, container := range pod.Spec.Containers {147 containers = append(containers, container.Name)148 }149 // go func() {150 defer close(logs)151 for _, container := range containers {152 podLogOptions := corev1.PodLogOptions{153 Follow: true,154 TailLines: &count,155 Container: container,156 }157 podLogRequest := e.KubeClient.CoreV1().158 Pods(pod.GetNamespace()).159 GetLogs(pod.GetName(), &podLogOptions)160 stream, err := podLogRequest.Stream(ctx)161 if err != nil {162 logrus.Error("stream error", "error", err)163 continue164 }165 scanner := bufio.NewScanner(stream)166 // set default bufio scanner buffer (to limit bufio.Scanner: token too long errors on very long lines)167 buf := make([]byte, 0, 64*1024)168 scanner.Buffer(buf, 1024*1024)169 for scanner.Scan() {170 logrus.Debug("TailPodLogs stream scan", "out", scanner.Text(), "pod", pod.Name)171 logs <- scanner.Bytes()172 }173 if scanner.Err() != nil {174 return errors.Wrapf(scanner.Err(), "scanner error")175 }176 }177 // }()178 return179}...
TailPodLogs
Using AI Code Generation
1import (2func main() {3 clientset := createClientSet()4 clientset := createFakeClientSet()5 clientset := createFakeClientSetWithRecorder()6 clientset := createFakeClientSetWithRESTConfig()7 clientset := createFakeClientSetWithRESTConfigAndScheme()8 clientset := createFakeClientSetWithRESTConfigAndSchemeAndRecorder()9 clientset := createFakeClientSetWithRESTConfigAndSchemeAndRecorderAndCorev1()10 clientset := createFakeClientSetWithRESTConfigAndSchemeAndRecorderAndCorev1AndFakeClientset()11 clientset := createFakeClientSetWithRESTConfigAndSchemeAndRecorderAndCorev1AndFakeClientsetAndPod()12 clientset := createClientSetWithRESTConfig()13 clientset := createFakeClientSetWithRESTConfig()
TailPodLogs
Using AI Code Generation
1func main() {2 config, err := clientcmd.BuildConfigFromFlags("", "/home/user/.kube/config")3 if err != nil {4 panic(err.Error())5 }6 clientset, err := kubernetes.NewForConfig(config)7 if err != nil {8 panic(err.Error())9 }10 client := clientset.CoreV1().RESTClient()11 request := client.Get().Namespace("default").Resource("pods").Name("my-pod").SubResource("log").Param("follow", "true")12 watch, err := request.Stream()13 if err != nil {14 panic(err.Error())15 }16 defer watch.Close()17 buf := new(bytes.Buffer)18 scanner := bufio.NewScanner(watch)19 for scanner.Scan() {20 buf.Write(scanner.Bytes())21 fmt.Println(buf.String())22 buf.Reset()23 }24}
TailPodLogs
Using AI Code Generation
1import (2func main() {3 config, err := rest.InClusterConfig()4 if err != nil {5 config, err = clientcmd.BuildConfigFromFlags("", "C:\\Users\\sudhanshu\\.kube\\config")6 if err != nil {7 panic(err.Error())8 }9 }10 clientset, err := kubernetes.NewForConfig(config)11 if err != nil {12 panic(err.Error())13 }14 logs, err := clientset.CoreV1().Pods("default").GetLogs("myapp-pod", &v1.PodLogOptions{}).Stream()15 if err != nil {16 panic(err.Error())17 }18 defer logs.Close()19 buf := new(bytes.Buffer)20 buf.ReadFrom(logs)21 newStr := buf.String()22 fmt.Println(newStr)23}24import (25func main() {26 config, err := rest.InClusterConfig()27 if err != nil {28 config, err = clientcmd.BuildConfigFromFlags("", "C:\\Users\\sudhanshu\\.kube\\config")29 if err != nil {30 panic(err.Error())31 }32 }33 clientset, err := kubernetes.NewForConfig(config)34 if err != nil {35 panic(err.Error())36 }37 watcher, err := watch.NewStreamWatcher(38 &watch.HTTPWatcher{39 Client: clientset.CoreV1().RESTClient(),40 Codec: runtime.NewParameterCodec(clientset.CoreV1().RESTClient().GetConfig()),41 },
TailPodLogs
Using AI Code Generation
1import (2func main() {3 config, err := rest.InClusterConfig()4 if err != nil {5 panic(err.Error())6 }7 clientset, err := kubernetes.NewForConfig(config)8 if err != nil {9 panic(err.Error())10 }11 for {12 watcher, err := clientset.CoreV1().Pods("").Watch(context.Background(), v1.ListOptions{})13 if err != nil {14 panic(err.Error())15 }16 for event := range watcher.ResultChan() {17 if event.Type == watch.Added {18 pod := event.Object.(*v1.Pod)19 fmt.Printf("%s %s %s20 if pod.Status.Phase == "Running" {21 fmt.Printf("Pod %s is running22 go func(pod *v1.Pod) {23 req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{})24 podLogs, err := req.Stream(context.Background())25 if err != nil {26 panic(err.Error())27 }28 defer podLogs.Close()29 buf := make([]byte, 1024)30 for {31 n, err := podLogs.Read(buf)32 if err != nil && err != io.EOF {33 log.Fatal(err)34 }35 if n == 0 {36 }37 fmt.Printf("%s", string(buf[:n]))38 }39 }(pod)40 }41 }42 }43 time.Sleep(5 * time.Second)44 }45}
TailPodLogs
Using AI Code Generation
1import (2func main() {3 config, err := rest.InClusterConfig()4 if err != nil {5 fmt.Println("Failed to get in-cluster config. Trying to load admin.kubeconfig file.")6 kubeconfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(7 &clientcmd.ClientConfigLoadingRules{ExplicitPath: "/root/admin.kubeconfig"},8 &clientcmd.ConfigOverrides{},9 config, err = kubeconfig.ClientConfig()10 if err != nil {11 panic(err)12 }13 }14 appsClient, err := versioned.NewForConfig(config)15 if err != nil {16 panic(err)17 }18 coreClient, err := kubernetes.NewForConfig(config)19 if err != nil {20 panic(err)21 }22 dep, err := appsClient.AppsV1().DeploymentConfigs("default").Get(context.Background(), "myapp", metav1.GetOptions{})23 if err != nil {
TailPodLogs
Using AI Code Generation
1import (2func main() {3 config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)4 if err != nil {5 log.Fatal(err)6 }7 clientset, err := kubernetes.NewForConfig(config)8 if err != nil {9 log.Fatal(err)10 }11 pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})12 if err != nil {13 log.Fatal(err)14 }15 for _, pod := range pods.Items {16 fmt.Printf("Pod Name: %s17 }18 clientset.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{}).Stream()19}20import (21func main() {22 config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)23 if err != nil {24 log.Fatal(err)25 }26 clientset, err := kubernetes.NewForConfig(config)27 if err != nil {28 log.Fatal(err)29 }30 pods, err := clientset.CoreV1().Pods("").List(metav1.ListOptions{})31 if err != nil {32 log.Fatal(err)33 }
TailPodLogs
Using AI Code Generation
1import (2func main() {3 config, err := rest.InClusterConfig()4 if err != nil {5 kubeconfig := flag.String("kubeconfig", "", "absolute path to the kubeconfig file")6 flag.Parse()7 config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)8 if err != nil {9 panic(err.Error())10 }11 }12 clientset, err := kubernetes.NewForConfig(config)13 if err != nil {14 panic(err.Error())15 }16 logs, err := clientset.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{}).DoRaw()17 if err != nil {18 panic(err.Error())19 }20 fmt.Println(string(logs))21 logs, err = clientset.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{22 }).DoRaw()23 if err != nil {24 panic(err.Error())25 }26 fmt.Println(string(logs))27 logFile, err := os.Create("pod-logs.txt")28 if err != nil {29 log.Fatal(err)30 }31 defer logFile.Close()32 logs, err = clientset.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{33 }).Stream()34 if err != nil {35 panic(err.Error())36 }37 defer logs.Close()38 _, err = io.Copy(logFile, logs)39 if err != nil {40 log.Fatal(err)41 }42}
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!