kubeletとCRIの処理を雑に確認したメモ。
podの中身
pod内のアプリコンテナでは、mnt,uts,pid以外のnamespaceが共有されています。
pod内ではアプリコンテナだけでなくpauseコンテナも動いています。
https://speakerdeck.com/devinjeon/kubernetes-pod-internals-with-the-fundamentals-of-containers
pauseコンテナには、ipc,network namespaceを提供してアプリコンテナの障害時にもネットワーク設定?を維持する役割があるらしいです。
このようにpodでは、(複数の)アプリコンテナ+pauseコンテナが動作していて、ipc,network namespaceを共有している状態になります。
pod作成概要
k8sのworker nodeで動作するkubeletとコンテナランタイムの間でCRIという規格でポッドが作成されます。
ポッドの作成に関連する主なCRIは下記になります。
1.RunPodSandboxでpod(pauseコンテナ)を作成
2.CreateContainerでアプリコンテナを作成
3.StartContainerでアプリコンテナを作成
コンテナ作成については、containerdとruncの以前の記事で触れたので、今回はポッド作成処理の1を雑に追います。
RunPodSandboxは文字どおりポッド環境を作成するCRIで、ポッドネットワーク等を整備する処理になっているようです。
なお、コンテナランタイムによってポッドの定義は異なるため、katacontaneirのようなVMM型とcontainerd(runc)のようなkernel共有型で処理は異なるようです。
service RuntimeService { // Sandbox operations. rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {} rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {} rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {} rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {} rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {} // Container operations. rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {} rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {} rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {} rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {} rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {} rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {} ... }
ポッド作成の流れ
kubeletのポッド作成は下記のようなフローで行われるようです。
1.ポッド作成のイベントを受信するとHandlePodAdditionsが呼び出される。
2.最終的にkubeRuntimeManager.SyncpodからdockershimやCRI経由でポッド作成やコンテナ作成が行われる。
syncLoopIteration
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { case u, open := <-configCh: // Update from a config source; dispatch it to the right handler // callback. if !open { klog.ErrorS(nil, "Update channel is closed, exiting the sync loop") return false } switch u.Op { case kubetypes.ADD: klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods)) // After restarting, kubelet will get all existing pods through // ADD as if they are new pods. These pods will then go through the // admission process and *may* be rejected. This can be resolved // once we have checkpointing. handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods)) handler.HandlePodRemoves(u.Pods) case kubetypes.RECONCILE: klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods)) handler.HandlePodReconcile(u.Pods) case kubetypes.DELETE: klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods)) // DELETE is treated as a UPDATE because of graceful deletion. handler.HandlePodUpdates(u.Pods) case kubetypes.SET: // TODO: Do we want to support this? klog.ErrorS(nil, "Kubelet does not support snapshot update") default: klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op) }
HandlePodAdditions
HandlePodAdditions→dispatchWork→podWorkers.UpdatePodの順でポッド作成処理が呼び出されます。
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { start := kl.clock.Now() sort.Sort(sliceutils.PodsByCreationTime(pods)) for _, pod := range pods { existingPods := kl.podManager.GetPods() kl.podManager.AddPod(pod) if kubetypes.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start) continue } if !kl.podWorkers.IsPodTerminationRequested(pod.UID) { // We failed pods that we rejected, so activePods include all admitted // pods that are alive. activePods := kl.filterOutInactivePods(existingPods) // Check if we can admit the pod; if not, reject it. if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { kl.rejectPod(pod, reason, message) continue } } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) //非同期のpod起動処理を実行 kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) //ポッドが起動するコンテナそれぞれのStartup/Liveness/Readinessのprobe workerを起動 kl.probeManager.AddPod(pod) } }
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { // Run the sync in an async worker. kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, StartTime: start, }) // Note the number of containers for new pods. if syncType == kubetypes.SyncPodCreate { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } }
UpdatePod
続いてUpdatePod→managePodLoop(goroutine)→syncPodFnで処理が移り、syncPodFnが実際にコンテナの起動処理を行っていきます。
syncPodFnでは、ポッドstatusをAPI serverと同期したり、pod用ディレクトリを作成しているらしいです。
そして、なんやかんやでkubeRuntimeManager.Syncpodを呼び出します。
func (p *podWorkers) UpdatePod(options UpdatePodOptions) { ... if podUpdates, exists = p.podUpdates[uid]; !exists { // We need to have a buffer here, because checkForUpdates() method that // puts an update into channel is called from the same goroutine where // the channel is consumed. However, it is guaranteed that in such case // the channel is empty, so buffer of size 1 is enough. podUpdates = make(chan podWork, 1) p.podUpdates[uid] = podUpdates // Creating a new pod worker either means this is a new pod, or that the // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. go func() { defer runtime.HandleCrash() p.managePodLoop(podUpdates) }() } ...
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { var lastSyncTime time.Time for update := range podUpdates { pod := update.Options.Pod klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) err := func() error { // The worker is responsible for ensuring the sync method sees the appropriate // status updates on resyncs (the result of the last sync), transitions to // terminating (no wait), or on terminated (whatever the most recent state is). // Only syncing and terminating can generate pod status changes, while terminated // pods ensure the most recent status makes it to the api server. var status *kubecontainer.PodStatus var err error switch { case update.Options.RunningPod != nil: // when we receive a running pod, we don't need status at all default: // wait until we see the next refresh from the PLEG via the cache (max 2s) // TODO: this adds ~1s of latency on all transitions from sync to terminating // to terminated, and on all termination retries (including evictions). We should // improve latency by making the the pleg continuous and by allowing pod status // changes to be refreshed when key events happen (killPod, sync->terminating). // Improving this latency also reduces the possibility that a terminated // container's status is garbage collected before we have a chance to update the // API server (thus losing the exit code). status, err = p.podCache.GetNewerThan(pod.UID, lastSyncTime) } if err != nil { // This is the legacy event thrown by manage pod loop all other events are now dispatched // from syncPodFn p.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err) return err } ctx := p.contextForWorker(pod.UID) // Take the appropriate action (illegal phases are prevented by UpdatePod) switch { case update.WorkType == TerminatedPodWork: err = p.syncTerminatedPodFn(ctx, pod, status) case update.WorkType == TerminatingPodWork: ... default: err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status) } lastSyncTime = time.Now() return err }() ... }
syncPodFn
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { ... // Record pod worker start latency if being created // TODO: make pod workers record their own latencies if updateType == kubetypes.SyncPodCreate { if !firstSeenTime.IsZero() { // This is the first time we are syncing the pod. Record the latency // since kubelet first saw the pod if firstSeenTime is set. metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime)) } else { klog.V(3).InfoS("First seen time not recorded for pod", "podUID", pod.UID, "pod", klog.KObj(pod)) } } // Generate final API pod status with pod and status manager status apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and // set pod IP to hostIP directly in runtime.GetPodStatus podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs)) for _, ipInfo := range apiPodStatus.PodIPs { podStatus.IPs = append(podStatus.IPs, ipInfo.IP) } if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 { podStatus.IPs = []string{apiPodStatus.PodIP} } // If the pod should not be running, we request the pod's containers be stopped. This is not the same // as termination (we want to stop the pod, but potentially restart it later if soft admission allows // it later). Set the status and phase appropriately runnable := kl.canRunPod(pod) if !runnable.Admit { // Pod is not runnable; and update the Pod and Container statuses to why. if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded { apiPodStatus.Phase = v1.PodPending } apiPodStatus.Reason = runnable.Reason apiPodStatus.Message = runnable.Message // Waiting containers are not creating. const waitingReason = "Blocked" for _, cs := range apiPodStatus.InitContainerStatuses { if cs.State.Waiting != nil { cs.State.Waiting.Reason = waitingReason } } for _, cs := range apiPodStatus.ContainerStatuses { if cs.State.Waiting != nil { cs.State.Waiting.Reason = waitingReason } } } // Record the time it takes for the pod to become running. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime)) } kl.statusManager.SetPodStatus(pod, apiPodStatus) // Pods that are not runnable must be stopped - return a typed error to the pod worker if !runnable.Admit { klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message) var syncErr error p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) if err := kl.killPod(pod, p, nil); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) syncErr = fmt.Errorf("error killing pod: %v", err) utilruntime.HandleError(syncErr) } else { // There was no error killing the pod, but the pod cannot be run. // Return an error to signal that the sync loop should back off. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) } return syncErr } // If the network plugin is not ready, only start the pod if it uses the host network if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err) return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) } ... // Make data directories for the pod if err := kl.makePodDataDirs(pod); err != nil { ... } // Volume manager will not mount volumes for terminating pods // TODO: once context cancellation is added this check can be removed if !kl.podWorkers.IsPodTerminationRequested(pod.UID) { ... } // Fetch the pull secrets for the pod pullSecrets := kl.getPullSecretsForPod(pod) // Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) if err := result.Error(); err != nil { // Do not return error if the only failures were pods in backoff for _, r := range result.SyncResults { if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { // Do not record an event here, as we keep all event logging for sync pod failures // local to container runtime so we get better errors return err } } return nil } return nil }
kubeRuntimeManager.Syncpod
ようやくそれらしき処理が見えてきました。
4. Create sandbox if necessary.
6. Create init containers.
7. Create normal containers.
あたりでpodsandboxの作成やアプリコンテナの起動を行っているようです。
// SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create ephemeral containers. // 6. Create init containers. // 7. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes. ... // Step 2: Kill the pod if the sandbox has changed. ... // Step 3: kill any running containers in this pod which are not to keep. ... // Step 4: Create a sandbox for the pod if necessary. podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { var msg string var err error klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod)) metrics.StartedPodsTotal.Inc() createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) if err != nil { // createPodSandbox can return an error from CNI, CSI, // or CRI if the Pod has been deleted while the POD is // being created. If the pod has been deleted then it's // not a real error. // // SyncPod can still be running when we get here, which // means the PodWorker has not acked the deletion. if m.podStateProvider.IsPodTerminationRequested(pod.UID) { klog.V(4).InfoS("Pod was deleted and sandbox failed to be created", "pod", klog.KObj(pod), "podUID", pod.UID) return } metrics.StartedPodsErrorsTotal.WithLabelValues(err.Error()).Inc() createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg) klog.ErrorS(err, "CreatePodSandbox for pod failed", "pod", klog.KObj(pod)) ref, referr := ref.GetReference(legacyscheme.Scheme, pod) if referr != nil { klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod)) } m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err) return } klog.V(4).InfoS("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod)) podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) if err != nil { ref, referr := ref.GetReference(legacyscheme.Scheme, pod) if referr != nil { klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod)) } m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err) klog.ErrorS(err, "Failed to get pod sandbox status; Skipping pod", "pod", klog.KObj(pod)) result.Fail(err) return } // If we ever allow updating a pod from non-host-network to // host-network, we may use a stale IP. if !kubecontainer.IsHostNetworkPod(pod) { // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox. podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus) klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod)) } } // the start containers routines depend on pod ip(as in primary pod ip) // instead of trying to figure out if we have 0 < len(podIPs) // everytime, we short circuit it here podIP := "" if len(podIPs) != 0 { podIP = podIPs[0] } // Get podSandboxConfig for containers to start. configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID) result.AddSyncResult(configPodSandboxResult) podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt) if err != nil { message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) klog.ErrorS(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod)) configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message) return } // Helper containing boilerplate common to starting all types of containers. // typeName is a description used to describe this type of container in log messages, // currently: "container", "init container" or "ephemeral container" // metricLabel is the label used to describe this type of container in monitoring metrics. // currently: "container", "init_container" or "ephemeral_container" start := func(typeName, metricLabel string, spec *startSpec) error { startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod)) return err } metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc() klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod)) // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs. if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil { // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are // useful to cluster administrators to distinguish "server errors" from "user errors". metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc() startContainerResult.Fail(err, msg) // known errors that are logged in other places are logged at higher levels here to avoid // repetitive log spam switch { case err == images.ErrImagePullBackOff: klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err) default: utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg)) } return err } return nil } // Step 5: start ephemeral containers ... // Step 6: start the init container. if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil { return } // Successfully started the container; clear the entry in the failure klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod)) } // Step 7: start containers in podContainerChanges.ContainersToStart. for _, idx := range podContainerChanges.ContainersToStart { start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx])) } return }
createPodSandbox
generatePodSandboxConfigでpodのconfigを生成し、m.runtimeService.RunPodSandboxでRunPodSandbox CRIを呼び出しているようです。
// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error). func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) { podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt) if err != nil { message := fmt.Sprintf("Failed to generate sandbox config for pod %q: %v", format.Pod(pod), err) klog.ErrorS(err, "Failed to generate sandbox config for pod", "pod", klog.KObj(pod)) return "", message, err } // Create pod logs directory err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755) if err != nil { message := fmt.Sprintf("Failed to create log directory for pod %q: %v", format.Pod(pod), err) klog.ErrorS(err, "Failed to create log directory for pod", "pod", klog.KObj(pod)) return "", message, err } runtimeHandler := "" if m.runtimeClassManager != nil { runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName) if err != nil { message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err) return "", message, err } if runtimeHandler != "" { klog.V(2).InfoS("Running pod with runtime handler", "pod", klog.KObj(pod), "runtimeHandler", runtimeHandler) } } podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler) if err != nil { message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err) klog.ErrorS(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod)) return "", message, err } return podSandBoxID, "", nil }
RunPodSandbox(CRI client)
RunPodSandboxがgRPCとして定義されていて、CRI clientが実装されています。
func (r *remoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) { // Use 2 times longer timeout for sandbox operation (4 mins by default) // TODO: Make the pod sandbox timeout configurable. timeout := r.timeout * 2 klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox", "config", config, "runtimeHandler", runtimeHandler, "timeout", timeout) ctx, cancel := getContextWithTimeout(timeout) defer cancel() resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{ Config: config, RuntimeHandler: runtimeHandler, }) if err != nil { klog.ErrorS(err, "RunPodSandbox from runtime service failed") return "", err } if resp.PodSandboxId == "" { errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata()) err := errors.New(errorMessage) klog.ErrorS(err, "RunPodSandbox failed") return "", err } klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox Response", "podSandboxID", resp.PodSandboxId) return resp.PodSandboxId, nil }
service RuntimeService { // Version returns the runtime name, runtime version, and runtime API version. rpc Version(VersionRequest) returns (VersionResponse) {} // RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure // the sandbox is in the ready state on success. rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {} // StopPodSandbox stops any running process that is part of the sandbox and // reclaims network resources (e.g., IP addresses) allocated to the sandbox. // If there are any running containers in the sandbox, they must be forcibly // terminated. // This call is idempotent, and must not return an error if all relevant // resources have already been reclaimed. kubelet will call StopPodSandbox // at least once before calling RemovePodSandbox. It will also attempt to // reclaim resources eagerly, as soon as a sandbox is not needed. Hence, // multiple StopPodSandbox calls are expected. rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {} // RemovePodSandbox removes the sandbox. If there are any running containers // in the sandbox, they must be forcibly terminated and removed. // This call is idempotent, and must not return an error if the sandbox has // already been removed. rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {} // PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not // present, returns an error. rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {} // ListPodSandbox returns a list of PodSandboxes. rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {} // CreateContainer creates a new container in specified PodSandbox rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {} // StartContainer starts the container. rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {} // StopContainer stops a running container with a grace period (i.e., timeout). // This call is idempotent, and must not return an error if the container has // already been stopped. // TODO: what must the runtime do after the grace period is reached? rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {} // RemoveContainer removes the container. If the container is running, the // container must be forcibly removed. // This call is idempotent, and must not return an error if the container has // already been removed. rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {} // ListContainers lists all containers by filters. rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {} // ContainerStatus returns status of the container. If the container is not // present, returns an error. rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {} // UpdateContainerResources updates ContainerConfig of the container. rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {} // ReopenContainerLog asks runtime to reopen the stdout/stderr log file // for the container. This is often called after the log file has been // rotated. If the container is not running, container runtime can choose // to either create a new log file and return nil, or return an error. // Once it returns error, new container log file MUST NOT be created. rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {} // ExecSync runs a command in a container synchronously. rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {} // Exec prepares a streaming endpoint to execute a command in the container. rpc Exec(ExecRequest) returns (ExecResponse) {} // Attach prepares a streaming endpoint to attach to a running container. rpc Attach(AttachRequest) returns (AttachResponse) {} // PortForward prepares a streaming endpoint to forward ports from a PodSandbox. rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {} // ContainerStats returns stats of the container. If the container does not // exist, the call returns an error. rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {} // ListContainerStats returns stats of all running containers. rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {} // PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not // exist, the call returns an error. rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {} // ListPodSandboxStats returns stats of the pod sandboxes matching a filter. rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {} // UpdateRuntimeConfig updates the runtime configuration based on the given request. rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {} // Status returns the status of the runtime. rpc Status(StatusRequest) returns (StatusResponse) {} } func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) { out := new(RunPodSandboxResponse) err := c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/RunPodSandbox", in, out, opts...) if err != nil { return nil, err } return out, nil }
RunPodSandbox(docker)
Step 2のds.client.CreateContainerでpauseコンテナの作成、Step 4のds.client.StartContainerで起動を行っています。
以降はdocker cliからdockerdにリクエストを送る要領でコンテナが作成されていくはずです。
他には、resolv.confのオーバーライドやCNIによるネットワーク設定が行われます。
kubernetes/pkg/kubelet/dockershim/docker_sandbox.go // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. // For docker, PodSandbox is implemented by a container holding the network // namespace for the pod. // Note: docker doesn't use LogDirectory (yet). func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { config := r.GetConfig() // Step 1: Pull the image for the sandbox. image := defaultSandboxImage podSandboxImage := ds.podSandboxImage if len(podSandboxImage) != 0 { image = podSandboxImage } // NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly. // see: https://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-registry // Only pull sandbox image when it's not present - v1.PullIfNotPresent. if err := ensureSandboxImageExists(ds.client, image); err != nil { return nil, err } // Step 2: Create the sandbox container. if r.GetRuntimeHandler() != "" && r.GetRuntimeHandler() != runtimeName { return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler()) } createConfig, err := ds.makeSandboxDockerConfig(config, image) if err != nil { return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) } createResp, err := ds.client.CreateContainer(*createConfig) if err != nil { createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err) } if err != nil || createResp == nil { return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) } resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID} ds.setNetworkReady(createResp.ID, false) defer func(e *error) { // Set networking ready depending on the error return of // the parent function if *e == nil { ds.setNetworkReady(createResp.ID, true) } }(&err) // Step 3: Create Sandbox Checkpoint. if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { return nil, err } // Step 4: Start the sandbox container. // Assume kubelet's garbage collector would remove the sandbox later, if // startContainer failed. err = ds.client.StartContainer(createResp.ID) if err != nil { return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err) } // Rewrite resolv.conf file generated by docker. // NOTE: cluster dns settings aren't passed anymore to docker api in all cases, // not only for pods with host network: the resolver conf will be overwritten // after sandbox creation to override docker's behaviour. This resolv.conf // file is shared by all containers of the same pod, and needs to be modified // only once per pod. if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { containerInfo, err := ds.client.InspectContainer(createResp.ID) if err != nil { return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err) } if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil { return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err) } } // Do not invoke network plugins if in hostNetwork mode. if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE { return resp, nil } // Step 5: Setup networking for the sandbox. // All pod networking is setup by a CNI plugin discovered at startup time. // This plugin assigns the pod ip, sets up routes inside the sandbox, // creates interfaces etc. In theory, its jurisdiction ends with pod // sandbox networking, but it might insert iptables rules or open ports // on the host as well, to satisfy parts of the pod spec that aren't // recognized by the CNI standard yet. cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) networkOptions := make(map[string]string) if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { // Build DNS options. dnsOption, err := json.Marshal(dnsConfig) if err != nil { return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err) } networkOptions["dns"] = string(dnsOption) } err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions) if err != nil { errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)} // Ensure network resources are cleaned up even if the plugin // succeeded but an error happened between that success and here. err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) if err != nil { errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)) } err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod) if err != nil { errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err)) } return resp, utilerrors.NewAggregate(errList) } return resp, nil }
RunPodSandbox(containerd)
containerdのサーバ側の処理も確認してみると、docker-shimの処理にもあったようなコンテナ作成やネットワーク設定を行っているらしき処理が見えます。
containerdではコンテナをtaskとして管理するため、コンテナ作成処理はcontainer.NewTaskやtask.Startが該当すると思われます。
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) { config := r.GetConfig() log.G(ctx).Debugf("Sandbox config %+v", config) // Generate unique id and name for the sandbox and reserve the name. id := util.GenerateID() metadata := config.GetMetadata() if metadata == nil { return nil, errors.New("sandbox config must include metadata") } name := makeSandboxName(metadata) log.G(ctx).Debugf("Generated id %q for sandbox %q", id, name) // Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the // same sandbox. if err := c.sandboxNameIndex.Reserve(name, id); err != nil { return nil, errors.Wrapf(err, "failed to reserve sandbox name %q", name) } defer func() { // Release the name if the function returns with an error. if retErr != nil { c.sandboxNameIndex.ReleaseByName(name) } }() // Create initial internal sandbox object. sandbox := sandboxstore.NewSandbox( sandboxstore.Metadata{ ID: id, Name: name, Config: config, RuntimeHandler: r.GetRuntimeHandler(), }, sandboxstore.Status{ State: sandboxstore.StateUnknown, }, ) // Ensure sandbox container image snapshot. image, err := c.ensureImageExists(ctx, c.config.SandboxImage, config) if err != nil { return nil, errors.Wrapf(err, "failed to get sandbox image %q", c.config.SandboxImage) } containerdImage, err := c.toContainerdImage(ctx, *image) if err != nil { return nil, errors.Wrapf(err, "failed to get image from containerd %q", image.ID) } ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler()) if err != nil { return nil, errors.Wrap(err, "failed to get sandbox runtime") } log.G(ctx).Debugf("Use OCI %+v for sandbox %q", ociRuntime, id) podNetwork := true if goruntime.GOOS != "windows" && config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE { // Pod network is not needed on linux with host network. podNetwork = false } if goruntime.GOOS == "windows" && config.GetWindows().GetSecurityContext().GetHostProcess() { //Windows HostProcess pods can only run on the host network podNetwork = false } if podNetwork { // If it is not in host network namespace then create a namespace and set the sandbox // handle. NetNSPath in sandbox metadata and NetNS is non empty only for non host network // namespaces. If the pod is in host network namespace then both are empty and should not // be used. var netnsMountDir = "/var/run/netns" if c.config.NetNSMountsUnderStateDir { netnsMountDir = filepath.Join(c.config.StateDir, "netns") } sandbox.NetNS, err = netns.NewNetNS(netnsMountDir) if err != nil { return nil, errors.Wrapf(err, "failed to create network namespace for sandbox %q", id) } sandbox.NetNSPath = sandbox.NetNS.GetPath() defer func() { if retErr != nil { deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() // Teardown network if an error is returned. if err := c.teardownPodNetwork(deferCtx, sandbox); err != nil { log.G(ctx).WithError(err).Errorf("Failed to destroy network for sandbox %q", id) } if err := sandbox.NetNS.Remove(); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove network namespace %s for sandbox %q", sandbox.NetNSPath, id) } sandbox.NetNSPath = "" } }() // Setup network for sandbox. // Certain VM based solutions like clear containers (Issue containerd/cri-containerd#524) // rely on the assumption that CRI shim will not be querying the network namespace to check the // network states such as IP. // In future runtime implementation should avoid relying on CRI shim implementation details. // In this case however caching the IP will add a subtle performance enhancement by avoiding // calls to network namespace of the pod to query the IP of the veth interface on every // SandboxStatus request. if err := c.setupPodNetwork(ctx, &sandbox); err != nil { return nil, errors.Wrapf(err, "failed to setup network for sandbox %q", id) } } // Create sandbox container. // NOTE: sandboxContainerSpec SHOULD NOT have side // effect, e.g. accessing/creating files, so that we can test // it safely. spec, err := c.sandboxContainerSpec(id, config, &image.ImageSpec.Config, sandbox.NetNSPath, ociRuntime.PodAnnotations) if err != nil { return nil, errors.Wrap(err, "failed to generate sandbox container spec") } log.G(ctx).Debugf("Sandbox container %q spec: %#+v", id, spew.NewFormatter(spec)) sandbox.ProcessLabel = spec.Process.SelinuxLabel defer func() { if retErr != nil { selinux.ReleaseLabel(sandbox.ProcessLabel) } }() // handle any KVM based runtime if err := modifyProcessLabel(ociRuntime.Type, spec); err != nil { return nil, err } if config.GetLinux().GetSecurityContext().GetPrivileged() { // If privileged don't set selinux label, but we still record the MCS label so that // the unused label can be freed later. spec.Process.SelinuxLabel = "" } // Generate spec options that will be applied to the spec later. specOpts, err := c.sandboxContainerSpecOpts(config, &image.ImageSpec.Config) if err != nil { return nil, errors.Wrap(err, "failed to generate sanbdox container spec options") } sandboxLabels := buildLabels(config.Labels, image.ImageSpec.Config.Labels, containerKindSandbox) runtimeOpts, err := generateRuntimeOptions(ociRuntime, c.config) if err != nil { return nil, errors.Wrap(err, "failed to generate runtime options") } snapshotterOpt := snapshots.WithLabels(snapshots.FilterInheritedLabels(config.Annotations)) opts := []containerd.NewContainerOpts{ containerd.WithSnapshotter(c.config.ContainerdConfig.Snapshotter), customopts.WithNewSnapshot(id, containerdImage, snapshotterOpt), containerd.WithSpec(spec, specOpts...), containerd.WithContainerLabels(sandboxLabels), containerd.WithContainerExtension(sandboxMetadataExtension, &sandbox.Metadata), containerd.WithRuntime(ociRuntime.Type, runtimeOpts)} container, err := c.client.NewContainer(ctx, id, opts...) if err != nil { return nil, errors.Wrap(err, "failed to create containerd container") } defer func() { if retErr != nil { deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() if err := container.Delete(deferCtx, containerd.WithSnapshotCleanup); err != nil { log.G(ctx).WithError(err).Errorf("Failed to delete containerd container %q", id) } } }() // Create sandbox container root directories. sandboxRootDir := c.getSandboxRootDir(id) if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create sandbox root directory %q", sandboxRootDir) } defer func() { if retErr != nil { // Cleanup the sandbox root directory. if err := c.os.RemoveAll(sandboxRootDir); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove sandbox root directory %q", sandboxRootDir) } } }() volatileSandboxRootDir := c.getVolatileSandboxRootDir(id) if err := c.os.MkdirAll(volatileSandboxRootDir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create volatile sandbox root directory %q", volatileSandboxRootDir) } defer func() { if retErr != nil { // Cleanup the volatile sandbox root directory. if err := c.os.RemoveAll(volatileSandboxRootDir); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove volatile sandbox root directory %q", volatileSandboxRootDir) } } }() // Setup files required for the sandbox. if err = c.setupSandboxFiles(id, config); err != nil { return nil, errors.Wrapf(err, "failed to setup sandbox files") } defer func() { if retErr != nil { if err = c.cleanupSandboxFiles(id, config); err != nil { log.G(ctx).WithError(err).Errorf("Failed to cleanup sandbox files in %q", sandboxRootDir) } } }() // Update sandbox created timestamp. info, err := container.Info(ctx) if err != nil { return nil, errors.Wrap(err, "failed to get sandbox container info") } // Create sandbox task in containerd. log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, name) taskOpts := c.taskOpts(ociRuntime.Type) // We don't need stdio for sandbox container. task, err := container.NewTask(ctx, containerdio.NullIO, taskOpts...) if err != nil { return nil, errors.Wrap(err, "failed to create containerd task") } defer func() { if retErr != nil { deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() // Cleanup the sandbox container if an error is returned. if _, err := task.Delete(deferCtx, WithNRISandboxDelete(id), containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { log.G(ctx).WithError(err).Errorf("Failed to delete sandbox container %q", id) } } }() // wait is a long running background request, no timeout needed. exitCh, err := task.Wait(ctrdutil.NamespacedContext()) if err != nil { return nil, errors.Wrap(err, "failed to wait for sandbox container task") } nric, err := nri.New() if err != nil { return nil, errors.Wrap(err, "unable to create nri client") } if nric != nil { nriSB := &nri.Sandbox{ ID: id, Labels: config.Labels, } if _, err := nric.InvokeWithSandbox(ctx, task, v1.Create, nriSB); err != nil { return nil, errors.Wrap(err, "nri invoke") } } if err := task.Start(ctx); err != nil { return nil, errors.Wrapf(err, "failed to start sandbox container task %q", id) } if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { // Set the pod sandbox as ready after successfully start sandbox container. status.Pid = task.Pid() status.State = sandboxstore.StateReady status.CreatedAt = info.CreatedAt return status, nil }); err != nil { return nil, errors.Wrap(err, "failed to update sandbox status") } // Add sandbox into sandbox store in INIT state. sandbox.Container = container if err := c.sandboxStore.Add(sandbox); err != nil { return nil, errors.Wrapf(err, "failed to add sandbox %+v into store", sandbox) } // start the monitor after adding sandbox into the store, this ensures // that sandbox is in the store, when event monitor receives the TaskExit event. // // TaskOOM from containerd may come before sandbox is added to store, // but we don't care about sandbox TaskOOM right now, so it is fine. c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh) return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil }
startContainer
startContainerでは、イメージのプル、コンテナの作成、コンテナの実行が行われます。
デフォルトのサンドボックスイメージはgcr.io/google_containers/pause-amd64:3.0のようにpauseコンテナが指定されているらしいです。
// startContainer starts a container and returns a message indicates why it is failed on error. // It starts the container through the following steps: // * pull the image // * create the container // * start the container // * run the post start lifecycle hooks (if applicable) func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) { container := spec.container // Step 1: pull the image. imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig) ... // Step 2: create the container. // For a new container, the RestartCount should be 0 restartCount := 0 containerStatus := podStatus.FindContainerStatusByName(container.Name) ... target, err := spec.getTargetID(podStatus) containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig) // Step 3: start the container. err = m.runtimeService.StartContainer(containerID) m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name)) containerMeta := containerConfig.GetMetadata() sandboxMeta := podSandboxConfig.GetMetadata() legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name, sandboxMeta.Namespace) containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath) // Step 4: execute the post start hook. if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { kubeContainerID := kubecontainer.ContainerID{ Type: m.runtimeName, ID: containerID, } msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart) } return "", nil }