今回のテーマは高レベルコンテナランタイムcontainerdの内部処理についてです。
こちらは、2021/05時点の記事です。
Architecture
containerdは、Moduler Monolith Archtectureと呼ばれるマイクロサービスのようなアーキテクチャを採用しています。
具体的には、単一バイナリが内部的に複数のServiceに分割され、Service間がAPIでアクセスする仕組みになっています。
各ServiceはPluginとして起動時に登録され、主にgRPCと呼ばれるGoogleが開発を開始したリモートプロシージャコール (RPC) システムでAPI実装されています。
メジャーなcontainerd clientは、Kubernetesのノードエージェントであるkubeletやcontainerdに付属する専用clientがあります。
前者はCRI(Container Runtime Interface)と呼ばれるKubernetes API、後者はcontainerd APIを採用していますが、いずれもgRPCベースで実装されています。
特にcontainerdは、CRI ServiceというかたちでKubernetes APIをサポートしているため、Kubernetesランタイムとしての存在感を増しています。
さて、containerdでは下記の手順でコンテナが起動されます。
- Imageのpull:client.Pull
- Containerの作成:client.NewContainer
- Taskの作成:container.NewTask
- ステータス通知用channelの取得:task.Wait
- Taskの起動:task.Start
Server視点では、コンテナイメージがプルされるとコンテナイメージのデータがContentに格納され、ImagesやContainersなどのメタデータで管理されます。
また、低レベルコンテナランタイムに渡されるコンテナのルートファイルシステムであるsnapshotが作成され、コンテナがTaskとして起動されます。
上記にあるとおり、containerdにはContaierとTaskという二つのコンテナ概念があり、実際に低レベルコンテナランタイムを制御しているのはTaskになります。
コンテナの実行処理では、Task Serviceがshimと呼ばれるサーバをコンテナ単位で起動し、shimがruncを制御することでコンテナを起動します。
- Contaier:containerdにおけるメタデータとしてのコンテナ
- Task:containerdにおけるプロセスとしてのコンテナ
Task Serviceとshimの間はttRPCという別のリモートプロシージャコールで実装されています。
Scope
containerdには、コンテナイメージの管理からコンテナの実行まで様々な機能があります。
今回は、Taskの作成、つまりコンテナのプロセス作成処理にフォーカスを当てます。
具体的には、Task作成のAPIを提供するTask Serviceがshimを起動して、shimがruncコマンドを実行してコンテナを生成するところまでになります。
tasks service:Create
containerdにおいてAPIを提供するServiceはcontainerd/servicesで実装されています。
また、APIの定義はcontainerd/api/servicesの.protoファイルにあります。
/* containerd/api/services/tasks/v1/tasks.proto */ service Tasks { rpc Create(CreateTaskRequest) returns (CreateTaskResponse); rpc Start(StartRequest) returns (StartResponse); rpc Delete(DeleteTaskRequest) returns (DeleteResponse); rpc DeleteProcess(DeleteProcessRequest) returns (DeleteResponse); ... } message CreateTaskRequest { ... } message CreateTaskResponse { ... }
tasks serviceに対してgRPCのTask Createメソッドがコールされると、containerd/services/tasks/local.goに実装されたCreateメソッドが処理されます。
内部的にはTaskManager.CreateメソッドがTaskの作成を行っています。
上記処理の完了後は、gRPCレスポンスがgRPCクライアントに送信されます。
/*containerd/services/tasks/local.go*/ type local struct { runtimes map[string]runtime.PlatformRuntime containers containers.Store store content.Store publisher events.Publisher monitor runtime.TaskMonitor v2Runtime *v2.TaskManager } ... func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) { container, err := l.getContainer(ctx, r.ContainerID) ... //TaskManager.Createのオプション設定 opts := runtime.CreateOpts{ Spec: container.Spec, IO: runtime.IO{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, Terminal: r.Terminal, }, Checkpoint: checkpointPath, Runtime: container.Runtime.Name, RuntimeOptions: container.Runtime.Options, TaskOptions: r.Options, } ... //TaskManagerの取得 rtime, err := l.getRuntime(container.Runtime.Name) ... //TaskManager.Createでタスクを作成 c, err := rtime.Create(ctx, r.ContainerID, opts) ... //gRPCレスポンス return &api.CreateTaskResponse{ ContainerID: r.ContainerID, Pid: c.PID(), }, nil }
TaskManager.Create
TaskManager.Createメソッドを確認すると、OCI runtime bundleの作成とcontainerd-shimの起動が行われています。
/* containerd/runtime/v2/manager.go */ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { //OCI runtime bundleの作成 bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) if err != nil { return nil, err } ... //containerd-shimの起動 shim, err := m.startShim(ctx, bundle, id, opts) if err != nil { return nil, err } ... }
startShim
startShimではshimを起動して戻り値としてttRPCクライアントを格納したshimインスタンスを取得します。
/* containerd/runtime/v2/manager.go */ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { ns, err := namespaces.NamespaceRequired(ctx) topts := opts.TaskOptions b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) //shimを起動して戻り値としてttRPC接続先を取得 shim, err := b.Start(ctx, topts, func() ... return shim, nil } /* containerd/runtime/v2/binary.go */ func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary { return &binary{ bundle: bundle, runtime: runtime, containerdAddress: containerdAddress, containerdTTRPCAddress: containerdTTRPCAddress, events: events, rtTasks: rt, } }
shimBinary.Startメソッドでは、shim起動の起動コマンドが作成され、shimが起動されます。
shimの起動コマンドには、shimのオプションや低レベルランタイムであるruncに渡されるOCIバンドルの情報が格納されています。
/* containerd/runtime/v2/binary.go */ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { args := []string{"-id", b.bundle.ID} args = append(args, "start") //shim起動用のコマンド構築(shimのコマンド、OCIバンドルの情報など) cmd, err := client.Command( ctx, b.runtime, b.containerdAddress, b.containerdTTRPCAddress, b.bundle.Path, opts, args..., ) //shim起動コマンドの実行 //outはttRPCの接続先アドレス out, err := cmd.CombinedOutput() //アドレスの生成 address := strings.TrimSpace(string(out)) //コネクションの作成 conn, err := client.Connect(address, client.AnonDialer) //shimに接続するためのttRPCクライアントの取得 client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) return &shim{ bundle: b.bundle, client: client, task: task.NewTaskClient(client), events: b.events, rtTasks: b.rtTasks, }, nil }
shim起動コマンドの実行はCombinedOutputメソッドで行われ、内部ではosexec.Cmdでshimが起動されています。
また、TaskManagerがshimとやり取りするためのttRPCの接続先アドレスも提供されます。
次に、CombinedOutputで得られた接続先アドレスから、shimとのコネクションを作成し、shimに接続するためのttRPCクライアントを作成しています。
func (cmd *cmdWrapper) CombinedOutput() ([]byte, error) { out, err := (*osexec.Cmd)(cmd).CombinedOutput() return out, handleError(err) }
最後に、shim.Createメソッドでtaskの登録(コンテナの作成)が行われます。
shim内部では、runcが起動され、コンテナが作成されていますが、実行はされません。
/* containerd/runtime/v2/manager.go */ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { //OCI runtime bundleの作成 bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) if err != nil { return nil, err } ... //containerd-shimの起動+ttRPC接続 shim, err := m.startShim(ctx, bundle, id, opts) if err != nil { return nil, err } //ttRPC経由でshimにtaskを登録(コンテナは起動していない) t, err := shim.Create(ctx, opts) return t, nil }
shim.Create
以降、shim.Createからrunc createコマンドが実行されてコンテナが作成されるまでの過程を追います。
shim.Createメソッドでは、s.task.Createで「shim」のttRPC:Createが呼び出されます。
/* containerd/runtime/v2/shim.go */ type shim struct { bundle *Bundle client *ttrpc.Client task task.TaskService taskPid int events *exchange.Exchange rtTasks *runtime.TaskList } /* containerd/runtime/v2/shim.go */ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) { topts := opts.TaskOptions ... request := &task.CreateTaskRequest{ ID: s.ID(), Bundle: s.bundle.Path, Stdin: opts.IO.Stdin, Stdout: opts.IO.Stdout, Stderr: opts.IO.Stderr, Terminal: opts.IO.Terminal, Checkpoint: opts.Checkpoint, Options: topts, } for _, m := range opts.Rootfs { request.Rootfs = append(request.Rootfs, &types.Mount{ Type: m.Type, Source: m.Source, Options: m.Options, }) } response, err := s.task.Create(ctx, request) ... s.taskPid = int(response.Pid) return s, nil }
ttRPCのtask.Createはcontainerd/runtime/v2/taskで定義されています。
/*containerd/runtime/v2/task/shim.proto*/ service Task { rpc State(StateRequest) returns (StateResponse); rpc Create(CreateTaskRequest) returns (CreateTaskResponse); rpc Start(StartRequest) returns (StartResponse); rpc Delete(DeleteRequest) returns (DeleteResponse); rpc Pids(PidsRequest) returns (PidsResponse); rpc Pause(PauseRequest) returns (google.protobuf.Empty); rpc Resume(ResumeRequest) returns (google.protobuf.Empty); rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty); rpc Kill(KillRequest) returns (google.protobuf.Empty); rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty); rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty); rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty); rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty); rpc Wait(WaitRequest) returns (WaitResponse); rpc Stats(StatsRequest) returns (StatsResponse); rpc Connect(ConnectRequest) returns (ConnectResponse); rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty); } message CreateTaskRequest { string id = 1; string bundle = 2; repeated containerd.types.Mount rootfs = 3; bool terminal = 4; string stdin = 5; string stdout = 6; string stderr = 7; string checkpoint = 8; string parent_checkpoint = 9; google.protobuf.Any options = 10; } func (c *taskClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) { var resp CreateTaskResponse if err := c.client.Call(ctx, "containerd.task.v2.Task", "Create", req, &resp); err != nil { ... } return &resp, nil }
shim側のtask.Createはcontainerd/runtime/v2/runc/v2/service.goに実装されています。
/* containerd/runtime/v2/runc/v2/service.go */ // Create a new initial process and container with the underlying OCI runtime func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { //コンテナの作成 container, err := runc.NewContainer(ctx, s.platform, r) ... //ttRPCのレスポンス return &taskAPI.CreateTaskResponse{ Pid: uint32(container.Pid()), }, nil } //runc.Container type Container struct { mu sync.Mutex // ID of the container ID string // Bundle path Bundle string // cgroup is either cgroups.Cgroup or *cgroupsv2.Manager cgroup interface{} process process.Process processes map[string]process.Process reservedProcess map[string]struct{} }
NewContainer
NewContainerメソッドではrunc.Containerが作成されます。
NewContainerメソッドで重要なのは下記2つの処理です。
①コンテナ内initプロセスの管理インスタンスを生成するnewInitメソッド
②上記で作成されたInitインスタンスがコンテナを作成するCreateメソッド
/* containerd/runtime/v2/runc/container.go */ package runc // NewContainer returns a new runc container func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) { ns, err := namespaces.NamespaceRequired(ctx) ... var mounts []process.Mount for _, m := range r.Rootfs { mounts = append(mounts, process.Mount{ Type: m.Type, Source: m.Source, Target: m.Target, Options: m.Options, }) } rootfs := "" if len(mounts) > 0 { rootfs = filepath.Join(r.Bundle, "rootfs") if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) { return nil, err } } config := &process.CreateConfig{ ID: r.ID, Bundle: r.Bundle, Runtime: opts.BinaryName, Rootfs: mounts, Terminal: r.Terminal, Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, Checkpoint: r.Checkpoint, ParentCheckpoint: r.ParentCheckpoint, Options: r.Options, } //コンテナ内のinitプロセスインスタンス p, err := newInit( ctx, r.Bundle, filepath.Join(r.Bundle, "work"), ns, platform, config, &opts, rootfs, ) //initプロセスのCreateメソッドでコンテナを作成 if err := p.Create(ctx, config); err != nil { return nil, errdefs.ToGRPC(err) } container := &Container{ ID: r.ID, Bundle: r.Bundle, process: p, processes: make(map[string]process.Process), reservedProcess: make(map[string]struct{}), } ... return container, nil }
newInit
newInitメソッドではInitインスタンスが作成されます。
処理を確認するとNewRuncメソッドでRuncインスタンスが作成され、process.NewメソッドでInitインスタンスを作成しています。
/* containerd/runtime/v2/runc/container.go */ func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *process.CreateConfig, options *options.Options, rootfs string) (*process.Init, error) { //Runcのインスタンスを生成 runtime := process.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup) p := process.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, Terminal: r.Terminal, }) p.Bundle = r.Bundle p.Platform = platform p.Rootfs = rootfs p.WorkDir = workDir p.IoUID = int(options.IoUid) p.IoGID = int(options.IoGid) p.NoPivotRoot = options.NoPivotRoot p.NoNewKeyring = options.NoNewKeyring p.CriuWorkPath = options.CriuWorkPath if p.CriuWorkPath == "" { // if criu work path not set, use container WorkDir p.CriuWorkPath = p.WorkDir } return p, nil }
NewRunc
NewRuncメソッドはRuncインスタンスを作成します。
Runcインスタンスは、runcのサブコマンドに対応したメソッドがあり、runcを実行する機能があります。
そのため、このRuncインスタンスがcontainerdにおけるruncの現身といえます。
/* containerd/pkg/process/init.go */ // NewRunc returns a new runc instance for a process func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Runc { if root == "" { root = RuncRoot } return &runc.Runc{ Command: runtime, Log: filepath.Join(path, "log.json"), LogFormat: runc.JSON, PdeathSignal: unix.SIGKILL, Root: filepath.Join(root, namespace), Criu: criu, SystemdCgroup: systemd, } } /* vendor/github.com/containerd/go-runc/runc_unix.go */ // Runc is the client to the runc cli type Runc struct { //If command is empty, DefaultCommand is used Command string Root string Debug bool Log string LogFormat Format PdeathSignal unix.Signal Setpgid bool Criu string SystemdCgroup bool Rootless *bool // nil stands for "auto" } /* containerd/vendor/github.com/containerd/go-runc/runc.go */ // List returns all containers created inside the provided runc root directory /* containerd/vendor/github.com/containerd/go-runc/runc.go */ func (r *Runc) List(context context.Context) (*Container, error) { ... } // Create creates a new container and returns its pid if it was created successfully func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error { args := string{"create", "--bundle", bundle} ... } // Kill sends the specified signal to the container func (r *Runc) Kill(context context.Context, id string, sig int, opts *KillOpts) error { ... } //runc startの実行 // Start will start an already created container func (r *Runc) Start(context context.Context, id string) error { return r.runOrError(r.command(context, "start", id)) }
process.New
process.NewではInitインスタンスが作成されます。
このinitインスタンスにはCreateメソッドやStartメソッドなど、Initプロセスの操作を行うメソッドが用意されています。
/* containerd/pkg/process/init.go */ //process.New func New(id string, runtime *runc.Runc, stdio stdio.Stdio) *Init { p := &Init{ id: id, runtime: runtime, pausing: new(atomicBool), stdio: stdio, status: 0, waitBlock: make(chan struct{}), } ... return p } /* containerd/pkg/process/init.go */ // Init represents an initial process for a container type Init struct { ... id string Bundle string console console.Console Platform stdio.Platform io *processIO runtime *runc.Runc status int exited time.Time pid int stdin io.Closer stdio stdio.Stdio Rootfs string IoUID int IoGID int NoPivotRoot bool } // Create the process with the provided config func (p *Init) Create(ctx context.Context, r *CreateConfig) error { ... } // Start the init process func (p *Init) Start(ctx context.Context) error { ... }
Init.Create
ここでNewContainerメソッドに戻ります。
newInitメソッドでinitインスタンスが作成された後に、Createメソッドが呼ばれています。
これが前述の (p *Init) Createメソッドに該当します。
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) { ... //コンテナ内のinitプロセスインスタンス p, err := newInit( ctx, r.Bundle, filepath.Join(r.Bundle, "work"), ns, platform, config, &opts, rootfs, ) ... //InitインスタンスのCreateメソッドでコンテナを作成 if err := p.Create(ctx, config); err != nil { return nil, errdefs.ToGRPC(err) } ... }
Runc.Create
処理を確認するとさらに内部でp.runtime.Createメソッドを呼び出していることが分かります。
このruntimeはRuncインスタンスであり、同じくCreateメソッドが定義されています。
// Create the process with the provided config func (p *Init) Create(ctx context.Context, r *CreateConfig) error { ... if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil { ... } ... pid, err := pidFile.Read() ... p.pid = pid return nil }
Runc.Createメソッドの処理を確認すると、先頭でrunc create --bundle bundleというコマンドを生成しています。
OCIイメージとOCIランタイムバンドル - 鳩小屋
これは、runcがOCI runtime bundleからコンテナを起動するコマンドになります。
次に、Monitor.Startメソッドにcmdが指定されて呼び出されていることが確認できます。
// Create creates a new container and returns its pid if it was created successfully func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error { //runc createコマンドの生成 args := []string{"create", "--bundle", bundle} ... cmd := r.command(context, append(args, id)...) ... ec, err := Monitor.Start(cmd) ... status, err := Monitor.Wait(cmd, ec) ... return err }
Monitor.Startメソッドでは、exec.Cmd.Startが指定されてrunc createが実行されていることが分かります。
これで無事コンテナが起動されました。
// Start starts the command a registers the process with the reaper func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) { ec := m.Subscribe() if err := c.Start(); err != nil { ... } return ec, nil }
あとは、shimがTask Serviceに対してttRPCレスポンスを返し、Task ServiceがクライアントにgRPCレスポンスを返して一連の処理が完了します。
また、Task Createでは、コンテナは作成されますが、実行はされません。
コンテナを実行するには、Task Serviceに対してTask Startを呼出します。
こちらの処理も同様にshim経由でruncが制御される流れになります。(たぶん)