
在 kubernetes 中,如果需要从 pod 拷贝文件到当前主机,或者把当前主机的文件拷贝到 pod ,需要使用到 kubectl cp
命令。
root@k8s-dev-master:~# kubectl cp --help
Copy files and directories to and from containers.
Examples:
# !!!Important Note!!!
# Requires that the 'tar' binary is present in your container
# image. If 'tar' is not present, 'kubectl cp' will fail.
#
# For advanced use cases, such as symlinks, wildcard expansion or
# file mode preservation consider using 'kubectl exec'.
# Copy /tmp/foo local file to /tmp/bar in a remote pod in namespace <some-namespace>
tar cf - /tmp/foo | kubectl exec -i -n <some-namespace> <some-pod> -- tar xf - -C /tmp/bar
# Copy /tmp/foo from a remote pod to /tmp/bar locally
kubectl exec -n <some-namespace> <some-pod> -- tar cf - /tmp/foo | tar xf - -C /tmp/bar
# Copy /tmp/foo_dir local directory to /tmp/bar_dir in a remote pod in the default namespace
kubectl cp /tmp/foo_dir <some-pod>:/tmp/bar_dir
# Copy /tmp/foo local file to /tmp/bar in a remote pod in a specific container
kubectl cp /tmp/foo <some-pod>:/tmp/bar -c <specific-container>
# Copy /tmp/foo local file to /tmp/bar in a remote pod in namespace <some-namespace>
kubectl cp /tmp/foo <some-namespace>/<some-pod>:/tmp/bar
# Copy /tmp/foo from a remote pod to /tmp/bar locally
kubectl cp <some-namespace>/<some-pod>:/tmp/foo /tmp/bar
Options:
-c, --container='': Container name. If omitted, use the kubectl.kubernetes.io/default-container annotation for
selecting the container to be attached or the first container in the pod will be chosen
--no-preserve=false: The copied file/directory's ownership and permissions will not be preserved in the container
Usage:
kubectl cp <file-spec-src> <file-spec-dest> [options]
Use "kubectl options" for a list of global command-line options (applies to all commands).
通过帮助信息我们可以知道 kubectl cp
命令的用法,如果 Pod 有多个容器,那么需要使用 -c
指定容器名。
那么 kubectl cp
命令是怎么实现的呢?
kubectl cp 原理
kubectl cp
命令的实现其实比较简单,就是将文件或文件夹抽象成一个数据流 (Stream),然后通过 kube-apiserver 进行文件传输。源码位于:
https://github.com/kubernetes/kubectl/blob/master/pkg/cmd/cp/cp.go
tar cf - -> io.Writer -> kubernetes api -> io.Reader -> unTar
从 Pod 复制到当前主机
func (o *CopyOptions) copyFromPod(src, dest fileSpec) error {
if len(src.File) == 0 || len(dest.File) == 0 {
return errFileCannotBeEmpty
}
reader, outStream := io.Pipe()
options := &exec.ExecOptions{
StreamOptions: exec.StreamOptions{
IOStreams: genericclioptions.IOStreams{
In: nil,
Out: outStream,
ErrOut: o.Out,
},
Namespace: src.PodNamespace,
PodName: src.PodName,
},
// TODO: Improve error messages by first testing if 'tar' is present in the container?
Command: []string{"tar", "cf", "-", src.File},
Executor: &exec.DefaultRemoteExecutor{},
}
go func() {
defer outStream.Close()
cmdutil.CheckErr(o.execute(options))
}()
prefix := getPrefix(src.File)
prefix = path.Clean(prefix)
// remove extraneous path shortcuts - these could occur if a path contained extra "../"
// and attempted to navigate beyond "/" in a remote filesystem
prefix = stripPathShortcuts(prefix)
return o.untarAll(src, reader, dest.File, prefix)
}
可以看到是通过 exec
接口,使用 tar -cf -
命令把文件或文件夹转换成数据流,其实就是一个 tar
包,然后读取数据流,使用 unTarAll
函数遍历 tar 包,然后在当前主机目标位置创建对应的文件或文件夹,相当于 tar -xf -
命令。
我们也可以来看看转换成数据流的文件结构
root@k8s-dev-master:~# kubectl exec -it nginx-6fc95cbdfc-dlnt6 -- sh
/ # mkdir test
/ # echo "hello world" >test/file.log
/ # tar cf - test/
test/0000755000000000000000000000000014072551604010537 5ustar rootroottest/file.log0000644000000000000000000000001414072551604012154 0ustar rootroothello world
可以看到数据流中包含文件属性与权限等内容,比如分别用 5ustar
和 0ustar
表示目录和文件。
从当前主机复制到 Pod
func (o *CopyOptions) copyToPod(src, dest fileSpec, options *exec.ExecOptions) error {
if len(src.File) == 0 || len(dest.File) == 0 {
return errFileCannotBeEmpty
}
if _, err := os.Stat(src.File); err != nil {
return fmt.Errorf("%s doesn't exist in local filesystem", src.File)
}
reader, writer := io.Pipe()
// strip trailing slash (if any)
if dest.File != "/" && strings.HasSuffix(string(dest.File[len(dest.File)-1]), "/") {
dest.File = dest.File[:len(dest.File)-1]
}
if err := o.checkDestinationIsDir(dest); err == nil {
// If no error, dest.File was found to be a directory.
// Copy specified src into it
dest.File = dest.File + "/" + path.Base(src.File)
}
go func() {
defer writer.Close()
cmdutil.CheckErr(makeTar(src.File, dest.File, writer))
}()
var cmdArr []string
// TODO: Improve error messages by first testing if 'tar' is present in the container?
if o.NoPreserve {
cmdArr = []string{"tar", "--no-same-permissions", "--no-same-owner", "-xmf", "-"}
} else {
cmdArr = []string{"tar", "-xmf", "-"}
}
destDir := path.Dir(dest.File)
if len(destDir) > 0 {
cmdArr = append(cmdArr, "-C", destDir)
}
options.StreamOptions = exec.StreamOptions{
IOStreams: genericclioptions.IOStreams{
In: reader,
Out: o.Out,
ErrOut: o.ErrOut,
},
Stdin: true,
Namespace: dest.PodNamespace,
PodName: dest.PodName,
}
options.Command = cmdArr
options.Executor = &exec.DefaultRemoteExecutor{}
return o.execute(options)
}
其实跟从 Pod 复制到当前主机是差不多的,使用 makeTar
方法把文件转换成流(Stream),然后使用容器里面的 tar -xmf -
命令进行解压。
看了 kubectl cp
的核心源码, 可以看出使用 cp 子命令是依赖容器里面的 tar 命令,所以 Pod 和宿主机之间能拷贝文件或文件夹的前提是容器必须有 tar 二进制文件。
使用 client-go 实现 cp 示例
参考 kubectl cp 就可以实现了,直接放码过来了....
package main
import (
"archive/tar"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)
var (
KubeQPS = float32(5.000000)
KubeBurst = 10
kubeConfig *string
AcceptContentTypes = "application/json"
ContentType = "application/json"
)
func setKubeConfig(config *rest.Config) {
config.QPS = KubeQPS
config.Burst = KubeBurst
config.ContentType = ContentType
config.AcceptContentTypes = AcceptContentTypes
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
// InitKubeConfig 初始化 k8s api 连接配置
func InitKubeConfig(env bool) (*rest.Config, error) {
if !env {
if kubeConfig != nil {
config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
panic(err.Error())
}
setKubeConfig(config)
return config, err
}
if home := homeDir(); home != "" {
kubeConfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeConfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
panic(err.Error())
}
setKubeConfig(config)
return config, err
} else {
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
if err != nil {
panic(err)
}
setKubeConfig(config)
return config, err
}
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
// NewClientSet ClientSet 客户端
func NewClientSet(c *rest.Config) (*kubernetes.Clientset, error) {
clientSet, err := kubernetes.NewForConfig(c)
return clientSet, err
}
func main() {
// 实例化 k8s 客户端
kubeConfig, err := InitKubeConfig(false)
if err != nil {
fmt.Errorf("ERROR: %s", err)
}
clientSet, err := NewClientSet(kubeConfig)
if err != nil {
fmt.Errorf("ERROR: %s", err)
}
err = copyFromPod(kubeConfig, clientSet)
if err != nil {
fmt.Errorf("ERROR: %s", err)
}
err = copyToPod(kubeConfig, clientSet)
if err != nil {
fmt.Errorf("ERROR: %s", err)
}
}
func getPrefix(file string) string {
return strings.TrimLeft(file, "/")
}
// stripPathShortcuts removes any leading or trailing "../" from a given path
func stripPathShortcuts(p string) string {
newPath := path.Clean(p)
trimmed := strings.TrimPrefix(newPath, "../")
for trimmed != newPath {
newPath = trimmed
trimmed = strings.TrimPrefix(newPath, "../")
}
// trim leftover {".", ".."}
if newPath == "." || newPath == ".." {
newPath = ""
}
if len(newPath) > 0 && string(newPath[0]) == "/" {
return newPath[1:]
}
return newPath
}
func unTarAll(reader io.Reader, destDir, prefix string) error {
tarReader := tar.NewReader(reader)
for {
header, err := tarReader.Next()
if err != nil {
if err != io.EOF {
return err
}
break
}
if !strings.HasPrefix(header.Name, prefix) {
return fmt.Errorf("tar contents corrupted")
}
mode := header.FileInfo().Mode()
destFileName := filepath.Join(destDir, header.Name[len(prefix):])
baseName := filepath.Dir(destFileName)
if err := os.MkdirAll(baseName, 0755); err != nil {
return err
}
if header.FileInfo().IsDir() {
if err := os.MkdirAll(destFileName, 0755); err != nil {
return err
}
continue
}
evaledPath, err := filepath.EvalSymlinks(baseName)
if err != nil {
return err
}
if mode&os.ModeSymlink != 0 {
linkname := header.Linkname
if !filepath.IsAbs(linkname) {
_ = filepath.Join(evaledPath, linkname)
}
if err := os.Symlink(linkname, destFileName); err != nil {
return err
}
} else {
outFile, err := os.Create(destFileName)
if err != nil {
return err
}
defer outFile.Close()
if _, err := io.Copy(outFile, tarReader); err != nil {
return err
}
if err := outFile.Close(); err != nil {
return err
}
}
}
return nil
}
// copyFromPod 从 pod 复制文件到本地
func copyFromPod(r *rest.Config, c *kubernetes.Clientset) error {
reader, outStream := io.Pipe()
// 初始化pod所在的 coreV1 资源组,发送请求
req := c.CoreV1().RESTClient().Get().
Resource("pods").
Name("nginx-6fc95cbdfc-dlnt6").
Namespace("default").
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
// 将数据转换成数据流
Command: []string{"tar", "cf", "-", "/etc/nginx"},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
// remotecommand 主要实现了http 转 SPDY 添加X-Stream-Protocol-Version相关header 并发送请求
exec, err := remotecommand.NewSPDYExecutor(r, "POST", req.URL())
if err != nil {
return err
}
go func() {
defer outStream.Close()
err = exec.Stream(remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: outStream,
Stderr: os.Stderr,
Tty: false,
})
cmdutil.CheckErr(err)
}()
prefix := getPrefix("/etc/nginx")
prefix = path.Clean(prefix)
prefix = stripPathShortcuts(prefix)
destPath := path.Join("./", path.Base(prefix))
err = unTarAll(reader, destPath, prefix)
return nil
}
func copyToPod(r *rest.Config, c *kubernetes.Clientset) error {
reader, writer := io.Pipe()
go func() {
defer writer.Close()
cmdutil.CheckErr(makeTar("./demo", "/demo", writer))
}()
req := c.CoreV1().RESTClient().Post().
Resource("pods").
Name("nginx-6fc95cbdfc-dlnt6").
Namespace("default").
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: []string{"tar", "-xmf", "-"},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(r, "POST", req.URL())
if err != nil {
return err
}
err = exec.Stream(remotecommand.StreamOptions{
Stdin: reader,
Stdout: os.Stdout,
Stderr: os.Stderr,
Tty: false,
})
if err != nil {
return err
}
return nil
}
func makeTar(srcPath, destPath string, writer io.Writer) error {
// TODO: use compression here?
tarWriter := tar.NewWriter(writer)
defer tarWriter.Close()
srcPath = path.Clean(srcPath)
destPath = path.Clean(destPath)
return recursiveTar(path.Dir(srcPath), path.Base(srcPath), path.Dir(destPath), path.Base(destPath), tarWriter)
}
func recursiveTar(srcBase, srcFile, destBase, destFile string, tw *tar.Writer) error {
srcPath := path.Join(srcBase, srcFile)
matchedPaths, err := filepath.Glob(srcPath)
if err != nil {
return err
}
for _, fpath := range matchedPaths {
stat, err := os.Lstat(fpath)
if err != nil {
return err
}
if stat.IsDir() {
files, err := ioutil.ReadDir(fpath)
if err != nil {
return err
}
if len(files) == 0 {
//case empty directory
hdr, _ := tar.FileInfoHeader(stat, fpath)
hdr.Name = destFile
if err := tw.WriteHeader(hdr); err != nil {
return err
}
}
for _, f := range files {
if err := recursiveTar(srcBase, path.Join(srcFile, f.Name()), destBase, path.Join(destFile, f.Name()), tw); err != nil {
return err
}
}
return nil
} else if stat.Mode()&os.ModeSymlink != 0 {
//case soft link
hdr, _ := tar.FileInfoHeader(stat, fpath)
target, err := os.Readlink(fpath)
if err != nil {
return err
}
hdr.Linkname = target
hdr.Name = destFile
if err := tw.WriteHeader(hdr); err != nil {
return err
}
} else {
//case regular file or other file type like pipe
hdr, err := tar.FileInfoHeader(stat, fpath)
if err != nil {
return err
}
hdr.Name = destFile
if err := tw.WriteHeader(hdr); err != nil {
return err
}
f, err := os.Open(fpath)
if err != nil {
return err
}
defer f.Close()
if _, err := io.Copy(tw, f); err != nil {
return err
}
return f.Close()
}
}
return nil
}
文章转载自ProdanLabs,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




