暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

He3DB日志归档-归档到S3源码实现

原创 有木名凌霄 2023-09-28
153

归档到S3源码实现

从tikv归档日志到S3中,需要访问tikv,使用tikv的Go语言API接口,可参考Go Client: Interact with TiKV using Go
。从tikv中取日志,然后存到S3中,同样需要Go语言的S3接口,可参考文档:https://docs.aws.amazon.com/sdk-for-go/api/service/s3/。 实现逻辑比较容易理解,代码如下:

package cmd var archiveCmd = &cobra.Command{ // cobra.Command 是一个结构体,代表一个命令 Use: "archive", // 命令的名称 Short: "Archive He3DB Xlog KV", // 当前命令的简短描述 Long: "Welcome to use hbr for He3DB xlog archive", // 当前命令的完整描述 Run: runArchive, // 属性是一个函数,当执行命令时会调用此函数 } func init() { rootCmd.AddCommand(archiveCmd) // 添加archive命令 } func runArchive(cmd *cobra.Command, args []string) { var sem = make(chan bool, concurrency) archiveStart := time.Now() access_key, _ := cmd.Flags().GetString("access_key") // s3的access_key secret_key, _ := cmd.Flags().GetString("secret_key") // s3的secret_key endpoint, _ := cmd.Flags().GetString("endpoint") // s3的访问地址 region, _ := cmd.Flags().GetString("region") // s3的region bucket, _ := cmd.Flags().GetString("bucket") // s3 桶名称 pd, _ := cmd.Flags().GetString("pd") // tikv的pd地址 backup_name, _ := cmd.Flags().GetString("name") archive_start_time_line, _ := cmd.Flags().GetString("archive_start_time_line") archive_start_lsn, _ := cmd.Flags().GetString("archive_start_lsn") if access_key == "" || secret_key == "" || endpoint == "" || region == "" || bucket == "" || pd == "" || backup_name == "" || archive_start_time_line == "" || archive_start_lsn == "" { fmt.Printf("PARAMETER ERROR!\n") return } client, err := tikv.NewRawKVClient([]string{pd}, config.Security{}) // tikv go client,访问tikv用 if err != nil { fmt.Printf("Connect Tikv Error!\n%v\n", err) return } // The session the S3 Uploader will use sess, err := session.NewSession(&aws.Config{ Region: aws.String(region), Endpoint: aws.String(endpoint), Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""), S3ForcePathStyle: aws.Bool(true), }) if err != nil { fmt.Printf("Connect S3 Error!\n%v\n", err) return } s3_client := s3.New(sess) // New creates a new instance of the S3 client with a session var filename string = "" wlCount := 0 // archive wal kv fmt.Printf("archive wal kv!\n") for id := 0; id < 8; id++ { //06000000000000000100000000000000070000000000000000 //因为加了个id字段,目前不能跨时间线备份 retStartString := fmt.Sprintf("06%s000000000000000%d%s", archive_start_time_line, id, archive_start_lsn) //retEndString := fmt.Sprintf("06ffffffffffffffff000000000000000%dffffffffffffffff", id) retEndString := fmt.Sprintf("06%s000000000000000%dffffffffffffffff", archive_start_time_line, id) retStart := make([]byte, 25) retEnd := make([]byte, 25) index := 0 for i := 0; i < len(retStartString); i += 2 { value, _ := strconv.ParseUint(retStartString[i:i+2], 16, 8) retStart[index] = byte(0xff & value) value, _ = strconv.ParseUint(retEndString[i:i+2], 16, 8) retEnd[index] = byte(0xff & value) index++ } fmt.Printf("%x\n", retStart) fmt.Printf("%x\n", retEnd) limit := 10240 for { keys, values, _ := client.Scan(retStart, retEnd, limit) // 从tikv中读日志 for k, _ := range keys { fmt.Printf("%x\n", keys[k]) filename = fmt.Sprintf("%x", keys[k]) wg.Add(1) sem <- true go s3PutKV(s3_client, bucket, backup_name, filename, values[k], sem) // 调用s3 api接口,将日志存到s3中 if bytes.Compare(retStart, keys[k]) < 0 { retStart = keys[k] } wlCount++ } if len(keys) < limit { break } wlCount-- } } wg.Wait() client.Close() fmt.Printf("wal kv count:%v\n", wlCount) fmt.Println("backup time:", time.Since(archiveStart)) } // 存数据到s3 func s3PutKV(s3_client *s3.S3, bucket string, backup_name string, filename string, v []byte, sem chan bool) { defer wg.Done() defer func() { <-sem }() _, err := s3_client.PutObject(&s3.PutObjectInput{ // Adds an object to a bucket. Bucket: aws.String(bucket), // 存到那个桶中 Key: aws.String(backup_name + "/" + filename), // key的构造:backup_name + filename Body: bytes.NewReader(v), }) if err != nil { fmt.Printf("S3 PutObject Error!\n%v\n", err) os.Exit(1) } //fmt.Printf("S3 PutObject!\n") }
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论