
撰文 | 段全锋
编辑 | zouyee
段全锋: 软件工程师,熟悉K8s架构、精通Runtime底层技术细节等。

目前我司现网的K8s集群的运行时已经完成从docker到Containerd的切换,有小伙伴对K8s与Containerd调用链涉及的组件不了解,其中Containerd和RunC是什么关系,docker和containerd又有什么区别,以及K8s调用Containerd创建容器又是怎样的流程,最终RunC又是如何创建容器的,诸如此类的疑问。本文就针对K8s使用Containerd作为运行时的整个调用链进行介绍和源码级别的分析。


“运行时简介”
b. 容器镜像规范:定义了容器的镜像如何打包如何将镜像转换成一个bundle。
目前流行将运行时分成low-level运行时和high-level运行时,low-level运行时专注于如何创建一个容器例如runc和kata,high-level包含了更多上层功能,比如镜像管理,以docker和containerd为代表。
K8s的kubelet是调用容器运行时创建容器的,但是容器运行时这么多不可能逐个兼容,K8s在对接容器运行时定义了CRI接口,容器运行时只需实现该接口就能被使用。下图分别是k8s使用docker和containerd的调用链,使用containerd时CRI接口是在containerd代码中实现的;使用docker时的CRI接口是在k8s的代码中实现的,叫做docker-shim(kubernetes/pkg/kubelet/dockershim/docker_service.go),这部分代码在k8s代码中是历史原因,当时docker是容器方面行业事实上的标准,但随着越来越多运行时实现了CRI支持,docker-shim的维护日益变成社区负担,在最新的K8s版本中,该部分代码目前已经移出,暂时由mirantis进行维护,下图是插件的发展历程。


“Containerd CRI简介”

Containerd有多种客户端,比如K8s、docker等,为了不同客户端的容器或者镜像能隔离开,Containerd中有namespace概念,默认情况下docker的namespace是moby,K8s的是k8s.io。
container在Containerd中代表的是一个容器的元数据,containerd中的Task用于获取容器对象并将它转换成在操作系统中可运行的进程,它代表的就是容器中可运行的对象。
Containerd内部的cri模块实现K8s的CRI接口,所以K8s的kubelet可以直接使用containerd。CRI的接口包括:RuntimeService 和 ImageService
// Runtime service defines the public APIs for remote container runtimesservice RuntimeService {// Version returns the runtime name, runtime version, and runtime API version.rpc Version(VersionRequest) returns (VersionResponse) {}// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure// the sandbox is in the ready state on success.rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}// StopPodSandbox stops any running process that is part of the sandbox and// reclaims network resources (e.g., IP addresses) allocated to the sandbox.// If there are any running containers in the sandbox, they must be forcibly// terminated.// This call is idempotent, and must not return an error if all relevant// resources have already been reclaimed. kubelet will call StopPodSandbox// at least once before calling RemovePodSandbox. It will also attempt to// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,// multiple StopPodSandbox calls are expected.rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}// RemovePodSandbox removes the sandbox. If there are any running containers// in the sandbox, they must be forcibly terminated and removed.// This call is idempotent, and must not return an error if the sandbox has// already been removed.rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not// present, returns an error.rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}// ListPodSandbox returns a list of PodSandboxes.rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}// CreateContainer creates a new container in specified PodSandboxrpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}// StartContainer starts the container.rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}// StopContainer stops a running container with a grace period (i.e., timeout).// This call is idempotent, and must not return an error if the container has// already been stopped.// The runtime must forcibly kill the container after the grace period is// reached.rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}// RemoveContainer removes the container. If the container is running, the// container must be forcibly removed.// This call is idempotent, and must not return an error if the container has// already been removed.rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}// ListContainers lists all containers by filters.rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}// ContainerStatus returns status of the container. If the container is not// present, returns an error.rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. This is often called after the log file has been// rotated. If the container is not running, container runtime can choose// to either create a new log file and return nil, or return an error.// Once it returns error, new container log file MUST NOT be created.rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}// ExecSync runs a command in a container synchronously.rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}// Exec prepares a streaming endpoint to execute a command in the container.rpc Exec(ExecRequest) returns (ExecResponse) {}// Attach prepares a streaming endpoint to attach to a running container.rpc Attach(AttachRequest) returns (AttachResponse) {}// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}// ListContainerStats returns stats of all running containers.rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not// exist, the call returns an error.rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {}// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}// UpdateRuntimeConfig updates the runtime configuration based on the given request.rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}// Status returns the status of the runtime.rpc Status(StatusRequest) returns (StatusResponse) {}}// ImageService defines the public APIs for managing images.service ImageService {// ListImages lists existing images.rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {}// ImageStatus returns the status of the image. If the image is not// present, returns a response with ImageStatusResponse.Image set to// nil.rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {}// PullImage pulls an image with authentication config.rpc PullImage(PullImageRequest) returns (PullImageResponse) {}// RemoveImage removes the image.// This call is idempotent, and must not return an error if the image has// already been removed.rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {}// ImageFSInfo returns information of the filesystem that is used to store images.rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {}}
kubelet调用CRI接口创建一个包含A和B两个业务container的Pod流程如下所示:
① 为Pod创建sandbox
② 创建container A
③ 启动container A
④ 创建container B
⑤ 启动container B


“Containerd CRI实现”
RunPodSandbox
RunPodSandbox的流程如下:
① 拉取sandbox的镜像,在containerd中配置
② 获取创建pod要使用的runtime,可以在创建pod的yaml中指定,如果没指定使用containerd中默认的(runtime在containerd中配置)
③ 如果pod不是hostNetwork那么添加创建新net namespace,并使用cni插件设置网络(criService在初始化时会加载containerd中cri指定的插件信息)
④ 调用containerd客户端创建一个container
⑤ 在rootDir/io.containerd.grpc.v1.cri/sandboxes下为当前pod以pod Id为名创建一个目录(pkg/cri/cri.go)
⑥ 根据选择的runtime为sandbox容器创建task
代码在containerd/pkg/cri/server/sanbox_run.go 中
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure// the sandbox is in ready state.func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {config := r.GetConfig()log.G(ctx).Debugf("Sandbox config %+v", config)// Generate unique id and name for the sandbox and reserve the name.id := util.GenerateID()metadata := config.GetMetadata()if metadata == nil {return nil, errors.New("sandbox config must include metadata")}name := makeSandboxName(metadata)log.G(ctx).WithField("podsandboxid", id).Debugf("generated id for sandbox name %q", name)// cleanupErr records the last error returned by the critical cleanup operations in deferred functions,// like CNI teardown and stopping the running sandbox task.// If cleanup is not completed for some reason, the CRI-plugin will leave the sandbox// in a not-ready state, which can later be cleaned up by the next execution of the kubelet's syncPod workflow.var cleanupErr error// Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the// same sandbox.if err := c.sandboxNameIndex.Reserve(name, id); err != nil {return nil, fmt.Errorf("failed to reserve sandbox name %q: %w", name, err)}defer func() {// Release the name if the function returns with an error.// When cleanupErr != nil, the name will be cleaned in sandbox_remove.if retErr != nil && cleanupErr == nil {c.sandboxNameIndex.ReleaseByName(name)}}()var (err errorsandboxInfo = sb.Sandbox{ID: id})ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler())if err != nil {return nil, fmt.Errorf("unable to get OCI runtime for sandbox %q: %w", id, err)}sandboxInfo.Runtime.Name = ociRuntime.Type// Retrieve runtime optionsruntimeOpts, err := generateRuntimeOptions(ociRuntime, c.config)if err != nil {return nil, fmt.Errorf("failed to generate sandbox runtime options: %w", err)}...// Create initial internal sandbox object.sandbox := sandboxstore.NewSandbox(...)if _, err := c.client.SandboxStore().Create(ctx, sandboxInfo); err != nil {return nil, fmt.Errorf("failed to save sandbox metadata: %w", err)}...// Setup the network namespace if host networking wasn't requested.if !hostNetwork(config) {netStart := time.Now()// If it is not in host network namespace then create a namespace and set the sandbox// handle. NetNSPath in sandbox metadata and NetNS is non empty only for non host network// namespaces. If the pod is in host network namespace then both are empty and should not// be used.var netnsMountDir = "/var/run/netns"if c.config.NetNSMountsUnderStateDir {netnsMountDir = filepath.Join(c.config.StateDir, "netns")}sandbox.NetNS, err = netns.NewNetNS(netnsMountDir)if err != nil {return nil, fmt.Errorf("failed to create network namespace for sandbox %q: %w", id, err)}// Update network namespace in the store, which is used to generate the container's specsandbox.NetNSPath = sandbox.NetNS.GetPath()defer func() {// Remove the network namespace only if all the resource cleanup is doneif retErr != nil && cleanupErr == nil {if cleanupErr = sandbox.NetNS.Remove(); cleanupErr != nil {log.G(ctx).WithError(cleanupErr).Errorf("Failed to remove network namespace %s for sandbox %q", sandbox.NetNSPath, id)return}sandbox.NetNSPath = ""}}()if err := sandboxInfo.AddExtension(podsandbox.MetadataKey, &sandbox.Metadata); err != nil {return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err)}// Save sandbox metadata to storeif sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil {return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err)}// Define this defer to teardownPodNetwork prior to the setupPodNetwork function call.// This is because in setupPodNetwork the resource is allocated even if it returns error, unlike other resource// creation functions.defer func() {// Remove the network namespace only if all the resource cleanup is done.if retErr != nil && cleanupErr == nil {deferCtx, deferCancel := ctrdutil.DeferContext()defer deferCancel()// Teardown network if an error is returned.if cleanupErr = c.teardownPodNetwork(deferCtx, sandbox); cleanupErr != nil {log.G(ctx).WithError(cleanupErr).Errorf("Failed to destroy network for sandbox %q", id)}}}()// Setup network for sandbox.// Certain VM based solutions like clear containers (Issue containerd/cri-containerd#524)// rely on the assumption that CRI shim will not be querying the network namespace to check the// network states such as IP.// In future runtime implementation should avoid relying on CRI shim implementation details.// In this case however caching the IP will add a subtle performance enhancement by avoiding// calls to network namespace of the pod to query the IP of the veth interface on every// SandboxStatus request.if err := c.setupPodNetwork(ctx, &sandbox); err != nil {return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err)}sandboxCreateNetworkTimer.UpdateSince(netStart)}if err := sandboxInfo.AddExtension(podsandbox.MetadataKey, &sandbox.Metadata); err != nil {return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err)}controller, err := c.getSandboxController(config, r.GetRuntimeHandler())if err != nil {return nil, fmt.Errorf("failed to get sandbox controller: %w", err)}// Save sandbox metadata to storeif sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil {return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err)}runtimeStart := time.Now()if err := controller.Create(ctx, id); err != nil {return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err)}resp, err := controller.Start(ctx, id)if err != nil {sandbox.Container, _ = c.client.LoadContainer(ctx, id)if resp != nil && resp.SandboxID == "" && resp.Pid == 0 && resp.CreatedAt == nil && len(resp.Labels) == 0 {// if resp is a non-nil zero-value, an error occurred during cleanupcleanupErr = fmt.Errorf("failed to cleanup sandbox")}return nil, fmt.Errorf("failed to start sandbox %q: %w", id, err)}// TODO: get rid of this. sandbox object should no longer have Container field.if ociRuntime.SandboxMode == string(criconfig.ModePodSandbox) {container, err := c.client.LoadContainer(ctx, id)if err != nil {return nil, fmt.Errorf("failed to load container %q for sandbox: %w", id, err)}sandbox.Container = container}labels := resp.GetLabels()if labels == nil {labels = map[string]string{}}sandbox.ProcessLabel = labels["selinux_label"]if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {// Set the pod sandbox as ready after successfully start sandbox container.status.Pid = resp.Pidstatus.State = sandboxstore.StateReadystatus.CreatedAt = protobuf.FromTimestamp(resp.CreatedAt)return status, nil}); err != nil {return nil, fmt.Errorf("failed to update sandbox status: %w", err)}// Add sandbox into sandbox store in INIT state.if err := c.sandboxStore.Add(sandbox); err != nil {return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err)}// Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId.// Note that this has to be done after sandboxStore.Add() because we need to get// SandboxStatus from the store and include it in the event.c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)// start the monitor after adding sandbox into the store, this ensures// that sandbox is in the store, when event monitor receives the TaskExit event.//// TaskOOM from containerd may come before sandbox is added to store,// but we don't care about sandbox TaskOOM right now, so it is fine.go func() {resp, err := controller.Wait(ctrdutil.NamespacedContext(), id)if err != nil {log.G(ctx).WithError(err).Error("failed to wait for sandbox controller, skipping exit event")return}e := &eventtypes.TaskExit{ContainerID: id,ID: id,// Pid is not usedPid: 0,ExitStatus: resp.ExitStatus,ExitedAt: resp.ExitedAt,}c.eventMonitor.backOff.enBackOff(id, e)}()// Send CONTAINER_STARTED event with ContainerId equal to SandboxId.c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart)return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil}
CreateContainer在指定的PodSandbox中创建一个新的container元数据,流程如下:
① 获取容器的sandbox信息
② 为容器要用的镜像初始化镜像handler
③ 为容器在rootDir/io.containerd.grpc.v1.cri目录下以容器Id命名的目录
④ 从sandbox中获取所使用的runtime
⑤ 为容器创建containerSpec
⑥ 使用containerd客户端创建container
⑦ 保存container的信息
代码见 containerd/pkg/cri/server/container_create.go 下面是省略过的代码。
func (c *criService) CreateContainer(ctx context.Context, r*runtime.CreateContainerRequest) (_ *runtime.CreateContainerResponse, retErr error) {sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId())s, err := sandbox.Container.Task(ctx, nil)sandboxPid := s.Pid()image, err := c.localResolve(config.GetImage().GetImage())if err != nil {return nil, errors.Wrapf(err, "failed to resolve image %q", config.GetImage().GetImage())}containerdImage, err := c.toContainerdImage(ctx, image)// Run container using the same runtime with sandbox.sandboxInfo, err := sandbox.Container.Info(ctx)if err != nil {return nil, errors.Wrapf(err, "failed to get sandbox %q info", sandboxID)}// Create container root directory. containerRootDir := c.getContainerRootDir(id)if err = c.os.MkdirAll(containerRootDir, 0755); err != nil {return nil, errors.Wrapf(err, "failed to create container root directory %q", containerRootDir)}ociRuntime, err := c.getSandboxRuntime(sandboxConfig, sandbox.Metadata.RuntimeHandler)if err != nil {return nil, errors.Wrap(err, "failed to get sandbox runtime")}spec, err := c.containerSpec(id, sandboxID, sandboxPid, sandbox.NetNSPath, containerName, containerdImage.Name(), config, sandboxConfig,&image.ImageSpec.Config, append(mounts, volumeMounts...), ociRuntime)if err != nil {return nil, errors.Wrapf(err, "failed to generate container %q spec", id)}opts = append(opts, containerd.WithSpec(spec, specOpts...),containerd.WithRuntime(sandboxInfo.Runtime.Name, runtimeOptions), containerd.WithContainerLabels(containerLabels), containerd.WithContainerExtension(containerMetadataExtension, &meta))var cntr containerd.Containerif cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {return nil, errors.Wrap(err, "failed to create containerd container")}// Add container into container store.if err := c.containerStore.Add(container); err != nil {return nil, errors.Wrapf(err, "failed to add container %q into store", id)}}
StartContainer用于启动一个容器,流程如下:
代码见 containerd/pkg/cri/server/container_start.go,下面是该部分省略过后的代码:
func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) {cntr, err := c.containerStore.Get(r.GetContainerId())// Get sandbox config from sandbox store. sandbox, err := c.sandboxStore.Get(meta.SandboxID) ctrInfo, err := container.Info(ctx)if err != nil {return nil, errors.Wrap(err, "failed to get container info")}taskOpts := c.taskOpts(ctrInfo.Runtime.Name)task, err := container.NewTask(ctx, ioCreation, taskOpts...)if err != nil {return nil, errors.Wrap(err, "failed to create containerd task")}// wait is a long running background request, no timeout needed. exitCh, err := task.Wait(ctrdutil.NamespacedContext())// Start containerd task.if err := task.Start(ctx); err != nil {return nil, errors.Wrapf(err, "failed to start containerd task %q", id)}}
创建task的代码如下,调用了containerd的客户端的TasksClient,向服务器端发送创建task的请求
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts...NewTaskOpts) (_ Task, err error) {......request := &tasks.CreateTaskRequest{ContainerID: c.id,Terminal: cfg.Terminal, Stdin: cfg.Stdin,Stdout: cfg.Stdout,Stderr: cfg.Stderr,}......response, err := c.client.TaskService().Create(ctx, request)......
task启动的代码如下,调用了containerd的客户端的TasksClient,向服务器端发送启动task的请求。
func (t *task) Start(ctx context.Context) error {r, err := t.client.TaskService().Start(ctx, &tasks.StartRequest{ContainerID: t.id,})if err != nil {if t.io != nil {t.io.Cancel()t.io.Close()}return errdefs.FromGRPC(err)}t.pid = r.Pidreturn nil}

“Task Service”
下面是tasks-service处理创建task请求的代码,根据容器运行时创建容器。
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _...grpc.CallOption) (*api.CreateTaskResponse, error) { container, err := l.getContainer(ctx, r.ContainerID)......rtime, err := l.getRuntime(container.Runtime.Name)if err != nil {return nil, err}_, err = rtime.Get(ctx, r.ContainerID)if err != nil && err != runtime.ErrTaskNotExists {return nil, errdefs.ToGRPC(err)}if err == nil {return nil, errdefs.ToGRPC(fmt.Errorf("task %s already exists", r.ContainerID))}c, err := rtime.Create(ctx, r.ContainerID, opts)......return &api.CreateTaskResponse{ContainerID: r.ContainerID, Pid: c.PID(),}, nil
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {......shim, err := m.startShim(ctx, bundle, id, opts)t, err := shim.Create(ctx, opts).....}
startShim调用shim可执行文件启动了一个service,代码如下:
func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {......b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)shim, err := b.Start(ctx, topts, func() {log.G(ctx).WithField("id", id).Info("shim disconnected")cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b) m.tasks.Delete(ctx, id)})......
执行shim命令所使用的可执行文件是containerd-shim-<runtime>-<version> ,比如我们平时使用的运行时类型是io.containerd.runc.v2 ,那么所用的可执行文件就是containerd-shim-runc-v2 ,完整的命令格式是
containerd-shim-runc-v2 -namespace xxxx -address xxxx -publish-binary xxxx -id xxxx start
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {args := []string{"id", b.bundle.ID}args = append(args, "start")cmd, err :=client.Command(ctx,b.runtime,b.containerdAddress,b.containerdTTRPCAddress,b.bundle.Path,opts,args...,)......out, err := cmd.CombinedOutput()if err != nil {return nil, errors.Wrapf(err, "%s", out)}address := strings.TrimSpace(string(out))conn, err := client.Connect(address, client.AnonDialer)if err != nil {return nil, err}onCloseWithShimLog := func() {onClose()cancelShimLog()f.Close()}client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))return &shim{bundle: b.bundle,client: client,}, nil
下面是tasks-service启动一个task的流程:
func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOption) (*api.StartResponse, error) {t, err := l.getTask(ctx, r.ContainerID)if err != nil {return nil, err}p := runtime.Process(t)if r.ExecID != "" {if p, err = t.Process(ctx, r.ExecID); err != nil {return nil, errdefs.ToGRPC(err)}}if err := p.Start(ctx); err != nil {return nil, errdefs.ToGRPC(err)}state, err := p.State(ctx)if err != nil {return nil, errdefs.ToGRPC(err)}return &api.StartResponse{Pid: state.Pid,}, nil}
启动容器的进程通过向shim的server端发送请求完成。
func (s *shim) Start(ctx context.Context) error {response, err := s.task.Start(ctx,&task.StartRequest{ID: s.ID(),})if err != nil {return errdefs.FromGRPC(err)}s.taskPid = int(response.Pid)return nil

“Containerd-shim启动流程”
containerd/runtime/v2/shim/shim.go 中
RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts)
是containerd-shim-runc-v2 start 的代码入口:
case "start":opts := StartOpts{Address: addressFlag,TTRPCAddress: ttrpcAddress,Debug: debugFlag,}address, err := manager.Start(ctx, id, opts)if err != nil {return err}if _, err := os.Stdout.WriteString(address); err != nil {return err}return nil}
containerd-shim-runc-v2 start进程会再次创建一个containerd-shim-runc-v2 -namespace xxxx -id xxxx - address xxxx 的进程用于启动shim server。
func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ string, retErr error) {cmd, err := newCommand(ctx, id, opts.Address, opts.TTRPCAddress, opts.Debug)...// make sure that reexec shim-v2 binary use the value if needif err := shim.WriteAddress("address", address); err != nil {return "", err}...if err := cmd.Start(); err != nil {f.Close()return "", err}...// make sure to wait after startgo cmd.Wait()...server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor))if err != nil {return fmt.Errorf("failed creating server: %w", err)}for _, srv := range ttrpcServices {if err := srv.RegisterTTRPC(server); err != nil {return fmt.Errorf("failed to register service: %w", err)}}if err := serve(ctx, server, signals, sd.Shutdown); err != nil {if err != shutdown.ErrShutdown {return err}}}
shim server是个ttrpc服务,提供如下接口:
type TaskService interface {State(context.Context, *StateRequest) (*StateResponse, error)Create(context.Context, *CreateTaskRequest) (*CreateTaskResponse, error)Start(context.Context, *StartRequest) (*StartResponse, error)Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)Pids(context.Context, *PidsRequest) (*PidsResponse, error)Pause(context.Context, *PauseRequest) (*emptypb.Empty, error)Resume(context.Context, *ResumeRequest) (*emptypb.Empty, error)Checkpoint(context.Context, *CheckpointTaskRequest) (*emptypb.Empty, error)Kill(context.Context, *KillRequest) (*emptypb.Empty, error)Exec(context.Context, *ExecProcessRequest) (*emptypb.Empty, error)ResizePty(context.Context, *ResizePtyRequest) (*emptypb.Empty, error)CloseIO(context.Context, *CloseIORequest) (*emptypb.Empty, error)Update(context.Context, *UpdateTaskRequest) (*emptypb.Empty, error)Wait(context.Context, *WaitRequest) (*WaitResponse, error)Stats(context.Context, *StatsRequest) (*StatsResponse, error)Connect(context.Context, *ConnectRequest) (*ConnectResponse, error)Shutdown(context.Context, *ShutdownRequest) (*emptypb.Empty, error)}
创建task是执行了runc create --bundle xxxx xxxx 命令,参考代码:
func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error{args := []string{"create", "bundle", bundle}......cmd := r.command(context, append(args, id)...)......ec, err := Monitor.Start(cmd)......}
启动task是执行了runc start xxxx 命令,参考代码:
func (r *Runc) Start(context context.Context, id string) error {return r.runOrError(r.command(context, "start", id))}
小结
kubelet创建sandbox流程总结如下:

👇🏻 真诚推荐你关注👇🏻
▲ 点击上方卡片关注K8s技术圈,掌握前沿云原生技术





