暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用 client-go 实现 kubectl cp 命令

ProdanLabs 2021-07-11
6269

在 kubernetes 中,如果需要从 pod 拷贝文件到当前主机,或者把当前主机的文件拷贝到 pod ,需要使用到 kubectl cp
 命令。

root@k8s-dev-master:~# kubectl cp --help
Copy files and directories to and from containers.

Examples:
  # !!!Important Note!!!
  # Requires that the 'tar' binary is present in your container
  # image.  If 'tar' is not present, 'kubectl cp' will fail.
  #
  # For advanced use cases, such as symlinks, wildcard expansion or
  # file mode preservation consider using 'kubectl exec'.

  # Copy /tmp/foo local file to /tmp/bar in a remote pod in namespace <some-namespace>
  tar cf - /tmp/foo | kubectl exec -i -n <some-namespace> <some-pod> -- tar xf - -C /tmp/bar

  # Copy /tmp/foo from a remote pod to /tmp/bar locally
  kubectl exec -n <some-namespace> <some-pod> -- tar cf - /tmp/foo | tar xf - -C /tmp/bar

  # Copy /tmp/foo_dir local directory to /tmp/bar_dir in a remote pod in the default namespace
  kubectl cp /tmp/foo_dir <some-pod>:/tmp/bar_dir

  # Copy /tmp/foo local file to /tmp/bar in a remote pod in a specific container
  kubectl cp /tmp/foo <some-pod>:/tmp/bar -c <specific-container>

  # Copy /tmp/foo local file to /tmp/bar in a remote pod in namespace <some-namespace>
  kubectl cp /tmp/foo <some-namespace>/<some-pod>:/tmp/bar

  # Copy /tmp/foo from a remote pod to /tmp/bar locally
  kubectl cp <some-namespace>/<some-pod>:/tmp/foo /tmp/bar

Options:
  -c, --container='': Container name. If omitted, use the kubectl.kubernetes.io/default-container annotation for
selecting the container to be attached or the first container in the pod will be chosen
      --no-preserve=false: The copied file/directory's ownership and permissions will not be preserved in the container

Usage:
  kubectl cp <file-spec-src> <file-spec-dest> [options]

Use "kubectl options" for a list of global command-line options (applies to all commands).


通过帮助信息我们可以知道 kubectl cp
 命令的用法,如果 Pod 有多个容器,那么需要使用
-c
指定容器名。


那么 kubectl cp
 命令是怎么实现的呢?


01

kubectl cp 原理

kubectl cp
命令的实现其实比较简单,就是将文件或文件夹抽象成一个数据流 (Stream),然后通过 kube-apiserver 进行文件传输。源码位于:

https://github.com/kubernetes/kubectl/blob/master/pkg/cmd/cp/cp.go

tar cf - -> io.Writer -> kubernetes api -> io.Reader -> unTar

从 Pod 复制到当前主机

func (o *CopyOptions) copyFromPod(src, dest fileSpec) error {
    if len(src.File) == 0 || len(dest.File) == 0 {
        return errFileCannotBeEmpty
    }

    reader, outStream := io.Pipe()
    options := &exec.ExecOptions{
        StreamOptions: exec.StreamOptions{
            IOStreams: genericclioptions.IOStreams{
                In:     nil,
                Out:    outStream,
                ErrOut: o.Out,
            },

            Namespace: src.PodNamespace,
            PodName:   src.PodName,
        },

        // TODO: Improve error messages by first testing if 'tar' is present in the container?
        Command:  []string{"tar""cf""-", src.File},
        Executor: &exec.DefaultRemoteExecutor{},
    }

    go func() {
        defer outStream.Close()
        cmdutil.CheckErr(o.execute(options))
    }()
    prefix := getPrefix(src.File)
    prefix = path.Clean(prefix)
    // remove extraneous path shortcuts - these could occur if a path contained extra "../"
    // and attempted to navigate beyond "/" in a remote filesystem
    prefix = stripPathShortcuts(prefix)
    return o.untarAll(src, reader, dest.File, prefix)
}


可以看到是通过 exec
接口,使用 tar -cf -
  命令把文件或文件夹转换成数据流,其实就是一个 tar
包,然后读取数据流,使用 unTarAll
 函数遍历 tar 包,然后在当前主机目标位置创建对应的文件或文件夹,相当于 tar -xf -
命令。  


我们也可以来看看转换成数据流的文件结构

root@k8s-dev-master:~# kubectl exec -it nginx-6fc95cbdfc-dlnt6 -- sh
# mkdir test
# echo "hello world" >test/file.log 
# tar cf - test/
test/0000755000000000000000000000000014072551604010537 5ustar  rootroottest/file.log0000644000000000000000000000001414072551604012154 0ustar  rootroothello world

可以看到数据流中包含文件属性与权限等内容,比如分别用 5ustar
 和  0ustar
 表示目录和文件。


从当前主机复制 Pod 

func (o *CopyOptions) copyToPod(src, dest fileSpec, options *exec.ExecOptions) error {
    if len(src.File) == 0 || len(dest.File) == 0 {
        return errFileCannotBeEmpty
    }
    if _, err := os.Stat(src.File); err != nil {
        return fmt.Errorf("%s doesn't exist in local filesystem", src.File)
    }
    reader, writer := io.Pipe()

    // strip trailing slash (if any)
    if dest.File != "/" && strings.HasSuffix(string(dest.File[len(dest.File)-1]), "/") {
        dest.File = dest.File[:len(dest.File)-1]
    }

    if err := o.checkDestinationIsDir(dest); err == nil {
        // If no error, dest.File was found to be a directory.
        // Copy specified src into it
        dest.File = dest.File + "/" + path.Base(src.File)
    }

    go func() {
        defer writer.Close()
        cmdutil.CheckErr(makeTar(src.File, dest.File, writer))
    }()
    var cmdArr []string

    // TODO: Improve error messages by first testing if 'tar' is present in the container?
    if o.NoPreserve {
        cmdArr = []string{"tar""--no-same-permissions""--no-same-owner""-xmf""-"}
    } else {
        cmdArr = []string{"tar""-xmf""-"}
    }
    destDir := path.Dir(dest.File)
    if len(destDir) > 0 {
        cmdArr = append(cmdArr, "-C", destDir)
    }

    options.StreamOptions = exec.StreamOptions{
        IOStreams: genericclioptions.IOStreams{
            In:     reader,
            Out:    o.Out,
            ErrOut: o.ErrOut,
        },
        Stdin: true,

        Namespace: dest.PodNamespace,
        PodName:   dest.PodName,
    }

    options.Command = cmdArr
    options.Executor = &exec.DefaultRemoteExecutor{}
    return o.execute(options)
}

其实跟从 Pod 复制到当前主机是差不多的,使用 makeTar
 方法把文件转换成(Stream),然后使用容器里面的 tar -xmf -
 命令进行解压。


看了 kubectl cp
的核心源码, 可以看出使用 cp 子命令是依赖容器里面的 tar 命令,所以 Pod 和宿主机之间能拷贝文件或文件夹的前提是容器必须有 tar 二进制文件。


02

使用 client-go 实现 cp 示例

参考 kubectl cp 就可以实现了,直接放码过来了....

package main

import (
    "archive/tar"
    "flag"
    "fmt"
    "io"
    "io/ioutil"
    "os"
    "path"
    "path/filepath"
    "strings"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/tools/remotecommand"
    cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

var (
    KubeQPS            = float32(5.000000)
    KubeBurst          = 10
    kubeConfig         *string
    AcceptContentTypes = "application/json"
    ContentType        = "application/json"
)

func setKubeConfig(config *rest.Config) {
    config.QPS = KubeQPS
    config.Burst = KubeBurst
    config.ContentType = ContentType
    config.AcceptContentTypes = AcceptContentTypes
    config.UserAgent = rest.DefaultKubernetesUserAgent()
}

// InitKubeConfig 初始化 k8s api 连接配置
func InitKubeConfig(env bool) (*rest.Config, error) {

    if !env {
        if kubeConfig != nil {
            config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
            if err != nil {
                panic(err.Error())
            }
            setKubeConfig(config)
            return config, err
        }
        if home := homeDir(); home != "" {
            kubeConfig = flag.String("kubeconfig", filepath.Join(home, ".kube""config"), "(optional) absolute path to the kubeconfig file")
        } else {
            kubeConfig = flag.String("kubeconfig""""absolute path to the kubeconfig file")
        }
        config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
        if err != nil {
            panic(err.Error())
        }
        setKubeConfig(config)
        return config, err

    } else {
        config, err := rest.InClusterConfig()
        if err != nil {
            panic(err.Error())
        }

        if err != nil {
            panic(err)
        }
        setKubeConfig(config)
        return config, err
    }
}

func homeDir() string {
    if h := os.Getenv("HOME"); h != "" {
        return h
    }
    return os.Getenv("USERPROFILE"// windows
}

// NewClientSet ClientSet 客户端
func NewClientSet(c *rest.Config) (*kubernetes.Clientset, error) {
    clientSet, err := kubernetes.NewForConfig(c)
    return clientSet, err
}

func main() {

    // 实例化 k8s 客户端
    kubeConfig, err := InitKubeConfig(false)
    if err != nil {
        fmt.Errorf("ERROR: %s", err)
    }
    clientSet, err := NewClientSet(kubeConfig)
    if err != nil {
        fmt.Errorf("ERROR: %s", err)
    }
    err = copyFromPod(kubeConfig, clientSet)
    if err != nil {
        fmt.Errorf("ERROR: %s", err)
    }

    err = copyToPod(kubeConfig, clientSet)
    if err != nil {
        fmt.Errorf("ERROR: %s", err)
    }

}

func getPrefix(file string) string {
    return strings.TrimLeft(file, "/")
}

// stripPathShortcuts removes any leading or trailing "../" from a given path
func stripPathShortcuts(p string) string {

    newPath := path.Clean(p)
    trimmed := strings.TrimPrefix(newPath, "../")

    for trimmed != newPath {
        newPath = trimmed
        trimmed = strings.TrimPrefix(newPath, "../")
    }

    // trim leftover {".", ".."}
    if newPath == "." || newPath == ".." {
        newPath = ""
    }

    if len(newPath) > 0 && string(newPath[0]) == "/" {
        return newPath[1:]
    }

    return newPath
}

func unTarAll(reader io.Reader, destDir, prefix string) error {
    tarReader := tar.NewReader(reader)
    for {
        header, err := tarReader.Next()
        if err != nil {
            if err != io.EOF {
                return err
            }
            break
        }

        if !strings.HasPrefix(header.Name, prefix) {
            return fmt.Errorf("tar contents corrupted")
        }

        mode := header.FileInfo().Mode()
        destFileName := filepath.Join(destDir, header.Name[len(prefix):])

        baseName := filepath.Dir(destFileName)
        if err := os.MkdirAll(baseName, 0755); err != nil {
            return err
        }
        if header.FileInfo().IsDir() {
            if err := os.MkdirAll(destFileName, 0755); err != nil {
                return err
            }
            continue
        }

        evaledPath, err := filepath.EvalSymlinks(baseName)
        if err != nil {
            return err
        }

        if mode&os.ModeSymlink != 0 {
            linkname := header.Linkname

            if !filepath.IsAbs(linkname) {
                _ = filepath.Join(evaledPath, linkname)
            }

            if err := os.Symlink(linkname, destFileName); err != nil {
                return err
            }
        } else {
            outFile, err := os.Create(destFileName)
            if err != nil {
                return err
            }
            defer outFile.Close()
            if _, err := io.Copy(outFile, tarReader); err != nil {
                return err
            }
            if err := outFile.Close(); err != nil {
                return err
            }
        }
    }

    return nil
}

// copyFromPod 从 pod 复制文件到本地
func copyFromPod(r *rest.Config, c *kubernetes.Clientset) error {
    reader, outStream := io.Pipe()

    // 初始化pod所在的 coreV1 资源组,发送请求
    req := c.CoreV1().RESTClient().Get().
        Resource("pods").
        Name("nginx-6fc95cbdfc-dlnt6").
        Namespace("default").
        SubResource("exec").
        VersionedParams(&corev1.PodExecOptions{
            // 将数据转换成数据流
            Command: []string{"tar""cf""-""/etc/nginx"},
            Stdin:   true,
            Stdout:  true,
            Stderr:  true,
            TTY:     false,
        }, scheme.ParameterCodec)

    // remotecommand 主要实现了http 转 SPDY 添加X-Stream-Protocol-Version相关header 并发送请求
    exec, err := remotecommand.NewSPDYExecutor(r, "POST", req.URL())
    if err != nil {
        return err
    }

    go func() {
        defer outStream.Close()
        err = exec.Stream(remotecommand.StreamOptions{
            Stdin:  os.Stdin,
            Stdout: outStream,
            Stderr: os.Stderr,
            Tty:    false,
        })
        cmdutil.CheckErr(err)
    }()

    prefix := getPrefix("/etc/nginx")
    prefix = path.Clean(prefix)
    prefix = stripPathShortcuts(prefix)
    destPath := path.Join("./", path.Base(prefix))
    err = unTarAll(reader, destPath, prefix)

    return nil
}

func copyToPod(r *rest.Config, c *kubernetes.Clientset) error {
    reader, writer := io.Pipe()
    go func() {
        defer writer.Close()
        cmdutil.CheckErr(makeTar("./demo""/demo", writer))
    }()

    req := c.CoreV1().RESTClient().Post().
        Resource("pods").
        Name("nginx-6fc95cbdfc-dlnt6").
        Namespace("default").
        SubResource("exec").
        VersionedParams(&corev1.PodExecOptions{
            Command: []string{"tar""-xmf""-"},
            Stdin:   true,
            Stdout:  true,
            Stderr:  true,
            TTY:     false,
        }, scheme.ParameterCodec)

    exec, err := remotecommand.NewSPDYExecutor(r, "POST", req.URL())
    if err != nil {
        return err
    }

    err = exec.Stream(remotecommand.StreamOptions{
        Stdin:  reader,
        Stdout: os.Stdout,
        Stderr: os.Stderr,
        Tty:    false,
    })
    if err != nil {
        return err
    }
    return nil
}

func makeTar(srcPath, destPath string, writer io.Writer) error {
    // TODO: use compression here?
    tarWriter := tar.NewWriter(writer)
    defer tarWriter.Close()

    srcPath = path.Clean(srcPath)
    destPath = path.Clean(destPath)
    return recursiveTar(path.Dir(srcPath), path.Base(srcPath), path.Dir(destPath), path.Base(destPath), tarWriter)
}

func recursiveTar(srcBase, srcFile, destBase, destFile string, tw *tar.Writer) error {
    srcPath := path.Join(srcBase, srcFile)
    matchedPaths, err := filepath.Glob(srcPath)
    if err != nil {
        return err
    }
    for _, fpath := range matchedPaths {
        stat, err := os.Lstat(fpath)
        if err != nil {
            return err
        }
        if stat.IsDir() {
            files, err := ioutil.ReadDir(fpath)
            if err != nil {
                return err
            }
            if len(files) == 0 {
                //case empty directory
                hdr, _ := tar.FileInfoHeader(stat, fpath)
                hdr.Name = destFile
                if err := tw.WriteHeader(hdr); err != nil {
                    return err
                }
            }
            for _, f := range files {
                if err := recursiveTar(srcBase, path.Join(srcFile, f.Name()), destBase, path.Join(destFile, f.Name()), tw); err != nil {
                    return err
                }
            }
            return nil
        } else if stat.Mode()&os.ModeSymlink != 0 {
            //case soft link
            hdr, _ := tar.FileInfoHeader(stat, fpath)
            target, err := os.Readlink(fpath)
            if err != nil {
                return err
            }

            hdr.Linkname = target
            hdr.Name = destFile
            if err := tw.WriteHeader(hdr); err != nil {
                return err
            }
        } else {
            //case regular file or other file type like pipe
            hdr, err := tar.FileInfoHeader(stat, fpath)
            if err != nil {
                return err
            }
            hdr.Name = destFile

            if err := tw.WriteHeader(hdr); err != nil {
                return err
            }

            f, err := os.Open(fpath)
            if err != nil {
                return err
            }
            defer f.Close()

            if _, err := io.Copy(tw, f); err != nil {
                return err
            }
            return f.Close()
        }
    }
    return nil
}





文章转载自ProdanLabs,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论