鳩小屋

落書き帳

High-level Container Runtime:containerd Internals

今回のテーマは高レベルコンテナランタイムcontainerdの内部処理についてです。
こちらは、2021/05時点の記事です。

Architecture

f:id:FallenPigeon:20210505113040p:plain
f:id:FallenPigeon:20210505122601p:plain

containerdは、Moduler Monolith Archtectureと呼ばれるマイクロサービスのようなアーキテクチャを採用しています。
具体的には、単一バイナリが内部的に複数のServiceに分割され、Service間がAPIでアクセスする仕組みになっています。
各ServiceはPluginとして起動時に登録され、主にgRPCと呼ばれるGoogleが開発を開始したリモートプロシージャコール (RPC) システムでAPI実装されています。

メジャーなcontainerd clientは、Kubernetesのノードエージェントであるkubeletやcontainerdに付属する専用clientがあります。
前者はCRI(Container Runtime Interface)と呼ばれるKubernetes API、後者はcontainerd APIを採用していますが、いずれもgRPCベースで実装されています。
特にcontainerdは、CRI ServiceというかたちでKubernetes APIをサポートしているため、Kubernetesランタイムとしての存在感を増しています。

さて、containerdでは下記の手順でコンテナが起動されます。

  1. Imageのpull:client.Pull
  2. Containerの作成:client.NewContainer
  3. Taskの作成:container.NewTask
  4. ステータス通知用channelの取得:task.Wait
  5. Taskの起動:task.Start

f:id:FallenPigeon:20210505122157p:plain

Server視点では、コンテナイメージがプルされるとコンテナイメージのデータがContentに格納され、ImagesやContainersなどのメタデータで管理されます。
また、低レベルコンテナランタイムに渡されるコンテナのルートファイルシステムであるsnapshotが作成され、コンテナがTaskとして起動されます。

上記にあるとおり、containerdにはContaierとTaskという二つのコンテナ概念があり、実際に低レベルコンテナランタイムを制御しているのはTaskになります。
コンテナの実行処理では、Task Serviceがshimと呼ばれるサーバをコンテナ単位で起動し、shimがruncを制御することでコンテナを起動します。

  • Contaier:containerdにおけるメタデータとしてのコンテナ
  • Task:containerdにおけるプロセスとしてのコンテナ

Task Serviceとshimの間はttRPCという別のリモートプロシージャコールで実装されています。

Scope

containerdには、コンテナイメージの管理からコンテナの実行まで様々な機能があります。
今回は、Taskの作成、つまりコンテナのプロセス作成処理にフォーカスを当てます。
具体的には、Task作成のAPIを提供するTask Serviceがshimを起動して、shimがruncコマンドを実行してコンテナを生成するところまでになります。

f:id:FallenPigeon:20210505113054p:plain

tasks service:Create

containerdにおいてAPIを提供するServiceはcontainerd/servicesで実装されています。
また、APIの定義はcontainerd/api/servicesの.protoファイルにあります。

/* containerd/api/services/tasks/v1/tasks.proto */
service Tasks {
	rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
	rpc Start(StartRequest) returns (StartResponse);
	rpc Delete(DeleteTaskRequest) returns (DeleteResponse);
	rpc DeleteProcess(DeleteProcessRequest) returns (DeleteResponse);
	...
}

message CreateTaskRequest {
	...
}

message CreateTaskResponse {
	...
}

tasks serviceに対してgRPCのTask Createメソッドがコールされると、containerd/services/tasks/local.goに実装されたCreateメソッドが処理されます。
内部的にはTaskManager.CreateメソッドがTaskの作成を行っています。
上記処理の完了後は、gRPCレスポンスがgRPCクライアントに送信されます。

/*containerd/services/tasks/local.go*/
type local struct {
	runtimes   map[string]runtime.PlatformRuntime
	containers containers.Store
	store      content.Store
	publisher  events.Publisher

	monitor   runtime.TaskMonitor
	v2Runtime *v2.TaskManager
}
...
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
	container, err := l.getContainer(ctx, r.ContainerID)
	...
	//TaskManager.Createのオプション設定
	opts := runtime.CreateOpts{
		Spec: container.Spec,
		IO: runtime.IO{
			Stdin:    r.Stdin,
			Stdout:   r.Stdout,
			Stderr:   r.Stderr,
			Terminal: r.Terminal,
		},
		Checkpoint:     checkpointPath,
		Runtime:        container.Runtime.Name,
		RuntimeOptions: container.Runtime.Options,
		TaskOptions:    r.Options,
	}
	...
	//TaskManagerの取得
	rtime, err := l.getRuntime(container.Runtime.Name)
	...
	//TaskManager.Createでタスクを作成
	c, err := rtime.Create(ctx, r.ContainerID, opts)
	...
	//gRPCレスポンス
	return &api.CreateTaskResponse{
		ContainerID: r.ContainerID,
		Pid:         c.PID(),
	}, nil
}

TaskManager.Create

TaskManager.Createメソッドを確認すると、OCI runtime bundleの作成とcontainerd-shimの起動が行われています。

/* containerd/runtime/v2/manager.go */
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {

	//OCI runtime bundleの作成
	bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
	if err != nil {
		return nil, err
	}
	...
	//containerd-shimの起動
	shim, err := m.startShim(ctx, bundle, id, opts)
	if err != nil {
		return nil, err
	}
	...
}

startShim

startShimではshimを起動して戻り値としてttRPCクライアントを格納したshimインスタンスを取得します。

/* containerd/runtime/v2/manager.go */
func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	topts := opts.TaskOptions

	b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)

	//shimを起動して戻り値としてttRPC接続先を取得
	shim, err := b.Start(ctx, topts, func()
	...
	return shim, nil
}

/* containerd/runtime/v2/binary.go */
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
	return &binary{
		bundle:                 bundle,
		runtime:                runtime,
		containerdAddress:      containerdAddress,
		containerdTTRPCAddress: containerdTTRPCAddress,
		events:                 events,
		rtTasks:                rt,
	}
}

shimBinary.Startメソッドでは、shim起動の起動コマンドが作成され、shimが起動されます。
shimの起動コマンドには、shimのオプションや低レベルランタイムであるruncに渡されるOCIバンドルの情報が格納されています。

/* containerd/runtime/v2/binary.go */
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
	args := []string{"-id", b.bundle.ID}
	args = append(args, "start")

	//shim起動用のコマンド構築(shimのコマンド、OCIバンドルの情報など)
	cmd, err := client.Command(
		ctx,
		b.runtime,
		b.containerdAddress,
		b.containerdTTRPCAddress,
		b.bundle.Path,
		opts,
		args...,
	)
	//shim起動コマンドの実行
	//outはttRPCの接続先アドレス
	out, err := cmd.CombinedOutput()

	//アドレスの生成
	address := strings.TrimSpace(string(out))
	//コネクションの作成
	conn, err := client.Connect(address, client.AnonDialer)

	//shimに接続するためのttRPCクライアントの取得
	client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
	return &shim{
		bundle:  b.bundle,
		client:  client,
		task:    task.NewTaskClient(client),
		events:  b.events,
		rtTasks: b.rtTasks,
	}, nil
}

shim起動コマンドの実行はCombinedOutputメソッドで行われ、内部ではosexec.Cmdでshimが起動されています。
また、TaskManagerがshimとやり取りするためのttRPCの接続先アドレスも提供されます。
次に、CombinedOutputで得られた接続先アドレスから、shimとのコネクションを作成し、shimに接続するためのttRPCクライアントを作成しています。

func (cmd *cmdWrapper) CombinedOutput() ([]byte, error) {
	out, err := (*osexec.Cmd)(cmd).CombinedOutput()
	return out, handleError(err)
}


最後に、shim.Createメソッドでtaskの登録(コンテナの作成)が行われます。
shim内部では、runcが起動され、コンテナが作成されていますが、実行はされません。

/* containerd/runtime/v2/manager.go */
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {

	//OCI runtime bundleの作成
	bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
	if err != nil {
		return nil, err
	}
	...
	//containerd-shimの起動+ttRPC接続
	shim, err := m.startShim(ctx, bundle, id, opts)
	if err != nil {
		return nil, err
	}
	//ttRPC経由でshimにtaskを登録(コンテナは起動していない)
	t, err := shim.Create(ctx, opts)

	return t, nil
}

shim.Create

以降、shim.Createからrunc createコマンドが実行されてコンテナが作成されるまでの過程を追います。
shim.Createメソッドでは、s.task.Createで「shim」のttRPC:Createが呼び出されます。

/* containerd/runtime/v2/shim.go */
type shim struct {
	bundle  *Bundle
	client  *ttrpc.Client
	task    task.TaskService
	taskPid int
	events  *exchange.Exchange
	rtTasks *runtime.TaskList
}

/* containerd/runtime/v2/shim.go */
func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) {
	topts := opts.TaskOptions
	...
	request := &task.CreateTaskRequest{
		ID:         s.ID(),
		Bundle:     s.bundle.Path,
		Stdin:      opts.IO.Stdin,
		Stdout:     opts.IO.Stdout,
		Stderr:     opts.IO.Stderr,
		Terminal:   opts.IO.Terminal,
		Checkpoint: opts.Checkpoint,
		Options:    topts,
	}
	for _, m := range opts.Rootfs {
		request.Rootfs = append(request.Rootfs, &types.Mount{
			Type:    m.Type,
			Source:  m.Source,
			Options: m.Options,
		})
	}
	response, err := s.task.Create(ctx, request)
	...
	s.taskPid = int(response.Pid)
	return s, nil
}

ttRPCのtask.Createはcontainerd/runtime/v2/taskで定義されています。

/*containerd/runtime/v2/task/shim.proto*/
service Task {
	rpc State(StateRequest) returns (StateResponse);
	rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
	rpc Start(StartRequest) returns (StartResponse);
	rpc Delete(DeleteRequest) returns (DeleteResponse);
	rpc Pids(PidsRequest) returns (PidsResponse);
	rpc Pause(PauseRequest) returns (google.protobuf.Empty);
	rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
	rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
	rpc Kill(KillRequest) returns (google.protobuf.Empty);
	rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
	rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
	rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
	rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
	rpc Wait(WaitRequest) returns (WaitResponse);
	rpc Stats(StatsRequest) returns (StatsResponse);
	rpc Connect(ConnectRequest) returns (ConnectResponse);
	rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty);
}

message CreateTaskRequest {
	string id = 1;
	string bundle = 2;
	repeated containerd.types.Mount rootfs = 3;
	bool terminal = 4;
	string stdin = 5;
	string stdout = 6;
	string stderr = 7;
	string checkpoint = 8;
	string parent_checkpoint = 9;
	google.protobuf.Any options = 10;
}

func (c *taskClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) {
	var resp CreateTaskResponse
	if err := c.client.Call(ctx, "containerd.task.v2.Task", "Create", req, &resp); err != nil {
		...
	}
	return &resp, nil
}

shim側のtask.Createはcontainerd/runtime/v2/runc/v2/service.goに実装されています。

/* containerd/runtime/v2/runc/v2/service.go */
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
	
	//コンテナの作成
	container, err := runc.NewContainer(ctx, s.platform, r)
	...
	//ttRPCのレスポンス
	return &taskAPI.CreateTaskResponse{
		Pid: uint32(container.Pid()),
	}, nil
}

//runc.Container
type Container struct {
	mu sync.Mutex

	// ID of the container
	ID string
	// Bundle path
	Bundle string

	// cgroup is either cgroups.Cgroup or *cgroupsv2.Manager
	cgroup          interface{}
	process         process.Process
	processes       map[string]process.Process
	reservedProcess map[string]struct{}
}

NewContainer

NewContainerメソッドではrunc.Containerが作成されます。
NewContainerメソッドで重要なのは下記2つの処理です。
①コンテナ内initプロセスの管理インスタンスを生成するnewInitメソッド
②上記で作成されたInitインスタンスがコンテナを作成するCreateメソッド

/* containerd/runtime/v2/runc/container.go */
package runc

// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	...
	var mounts []process.Mount
	for _, m := range r.Rootfs {
		mounts = append(mounts, process.Mount{
			Type:    m.Type,
			Source:  m.Source,
			Target:  m.Target,
			Options: m.Options,
		})
	}

	rootfs := ""
	if len(mounts) > 0 {
		rootfs = filepath.Join(r.Bundle, "rootfs")
		if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
			return nil, err
		}
	}

	config := &process.CreateConfig{
		ID:               r.ID,
		Bundle:           r.Bundle,
		Runtime:          opts.BinaryName,
		Rootfs:           mounts,
		Terminal:         r.Terminal,
		Stdin:            r.Stdin,
		Stdout:           r.Stdout,
		Stderr:           r.Stderr,
		Checkpoint:       r.Checkpoint,
		ParentCheckpoint: r.ParentCheckpoint,
		Options:          r.Options,
	}

	//コンテナ内のinitプロセスインスタンス
	p, err := newInit(
		ctx,
		r.Bundle,
		filepath.Join(r.Bundle, "work"),
		ns,
		platform,
		config,
		&opts,
		rootfs,
	)

	//initプロセスのCreateメソッドでコンテナを作成
	if err := p.Create(ctx, config); err != nil {
		return nil, errdefs.ToGRPC(err)
	}

	container := &Container{
		ID:              r.ID,
		Bundle:          r.Bundle,
		process:         p,
		processes:       make(map[string]process.Process),
		reservedProcess: make(map[string]struct{}),
	}
	...
	return container, nil
}
newInit

newInitメソッドではInitインスタンスが作成されます。
処理を確認するとNewRuncメソッドでRuncインスタンスが作成され、process.NewメソッドでInitインスタンスを作成しています。

/* containerd/runtime/v2/runc/container.go */
func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform,
	r *process.CreateConfig, options *options.Options, rootfs string) (*process.Init, error) {

	//Runcのインスタンスを生成
	runtime := process.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup)
	p := process.New(r.ID, runtime, stdio.Stdio{
		Stdin:    r.Stdin,
		Stdout:   r.Stdout,
		Stderr:   r.Stderr,
		Terminal: r.Terminal,
	})
	p.Bundle = r.Bundle
	p.Platform = platform
	p.Rootfs = rootfs
	p.WorkDir = workDir
	p.IoUID = int(options.IoUid)
	p.IoGID = int(options.IoGid)
	p.NoPivotRoot = options.NoPivotRoot
	p.NoNewKeyring = options.NoNewKeyring
	p.CriuWorkPath = options.CriuWorkPath
	if p.CriuWorkPath == "" {
		// if criu work path not set, use container WorkDir
		p.CriuWorkPath = p.WorkDir
	}
	return p, nil
}
NewRunc

NewRuncメソッドはRuncインスタンスを作成します。
Runcインスタンスは、runcのサブコマンドに対応したメソッドがあり、runcを実行する機能があります。
そのため、このRuncインスタンスがcontainerdにおけるruncの現身といえます。

/* containerd/pkg/process/init.go */
// NewRunc returns a new runc instance for a process
func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Runc {
	if root == "" {
		root = RuncRoot
	}
	return &runc.Runc{
		Command:       runtime,
		Log:           filepath.Join(path, "log.json"),
		LogFormat:     runc.JSON,
		PdeathSignal:  unix.SIGKILL,
		Root:          filepath.Join(root, namespace),
		Criu:          criu,
		SystemdCgroup: systemd,
	}
}

/* vendor/github.com/containerd/go-runc/runc_unix.go */
// Runc is the client to the runc cli
type Runc struct {
	//If command is empty, DefaultCommand is used
	Command       string
	Root          string
	Debug         bool
	Log           string
	LogFormat     Format
	PdeathSignal  unix.Signal
	Setpgid       bool
	Criu          string
	SystemdCgroup bool
	Rootless      *bool // nil stands for "auto"
}

/* containerd/vendor/github.com/containerd/go-runc/runc.go */

// List returns all containers created inside the provided runc root directory
/* containerd/vendor/github.com/containerd/go-runc/runc.go */
func (r *Runc) List(context context.Context) (*Container, error) {
	...
}

// Create creates a new container and returns its pid if it was created successfully
func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error {
	args := string{"create", "--bundle", bundle}
	...
}

// Kill sends the specified signal to the container
func (r *Runc) Kill(context context.Context, id string, sig int, opts *KillOpts) error {
	...
}

//runc startの実行
// Start will start an already created container
func (r *Runc) Start(context context.Context, id string) error {
	return r.runOrError(r.command(context, "start", id))
}
process.New

process.NewではInitインスタンスが作成されます。
このinitインスタンスにはCreateメソッドやStartメソッドなど、Initプロセスの操作を行うメソッドが用意されています。

/* containerd/pkg/process/init.go */
//process.New
func New(id string, runtime *runc.Runc, stdio stdio.Stdio) *Init {
	p := &Init{
		id:        id,
		runtime:   runtime,
		pausing:   new(atomicBool),
		stdio:     stdio,
		status:    0,
		waitBlock: make(chan struct{}),
	}
	...
	return p
}

/* containerd/pkg/process/init.go */
// Init represents an initial process for a container
type Init struct {
	...
	id       string
	Bundle   string
	console  console.Console
	Platform stdio.Platform
	io       *processIO
	runtime  *runc.Runc
	status       int
	exited       time.Time
	pid          int
	stdin        io.Closer
	stdio        stdio.Stdio
	Rootfs       string
	IoUID        int
	IoGID        int
	NoPivotRoot  bool
}

// Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
	...
}

// Start the init process
func (p *Init) Start(ctx context.Context) error {
	...
}
Init.Create

ここでNewContainerメソッドに戻ります。
newInitメソッドでinitインスタンスが作成された後に、Createメソッドが呼ばれています。
これが前述の (p *Init) Createメソッドに該当します。

func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
	...
	//コンテナ内のinitプロセスインスタンス
	p, err := newInit(
		ctx,
		r.Bundle,
		filepath.Join(r.Bundle, "work"),
		ns,
		platform,
		config,
		&opts,
		rootfs,
	)
	...
	//InitインスタンスのCreateメソッドでコンテナを作成
	if err := p.Create(ctx, config); err != nil {
		return nil, errdefs.ToGRPC(err)
	}
	...
}
Runc.Create

処理を確認するとさらに内部でp.runtime.Createメソッドを呼び出していることが分かります。
このruntimeはRuncインスタンスであり、同じくCreateメソッドが定義されています。

// Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
	...
	if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
		...
	}
	...
	pid, err := pidFile.Read()
	...
	p.pid = pid
	return nil
}

Runc.Createメソッドの処理を確認すると、先頭でrunc create --bundle bundleというコマンドを生成しています。
OCIイメージとOCIランタイムバンドル - 鳩小屋
これは、runcがOCI runtime bundleからコンテナを起動するコマンドになります。
次に、Monitor.Startメソッドにcmdが指定されて呼び出されていることが確認できます。

// Create creates a new container and returns its pid if it was created successfully
func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error {
        //runc createコマンドの生成
	args := []string{"create", "--bundle", bundle}
	...
	cmd := r.command(context, append(args, id)...)
	...
	ec, err := Monitor.Start(cmd)
	...
	status, err := Monitor.Wait(cmd, ec)
	...
	return err
}

Monitor.Startメソッドでは、exec.Cmd.Startが指定されてrunc createが実行されていることが分かります。
これで無事コンテナが起動されました。

// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
	ec := m.Subscribe()
	if err := c.Start(); err != nil {
		...
	}
	return ec, nil
}

あとは、shimがTask Serviceに対してttRPCレスポンスを返し、Task ServiceがクライアントにgRPCレスポンスを返して一連の処理が完了します。
また、Task Createでは、コンテナは作成されますが、実行はされません。
コンテナを実行するには、Task Serviceに対してTask Startを呼出します。
こちらの処理も同様にshim経由でruncが制御される流れになります。(たぶん)