归档到S3源码实现
从tikv归档日志到S3中,需要访问tikv,使用tikv的Go语言API接口,可参考Go Client: Interact with TiKV using Go
。从tikv中取日志,然后存到S3中,同样需要Go语言的S3接口,可参考文档:https://docs.aws.amazon.com/sdk-for-go/api/service/s3/。 实现逻辑比较容易理解,代码如下:
package cmd
var archiveCmd = &cobra.Command{ // cobra.Command 是一个结构体,代表一个命令
Use: "archive", // 命令的名称
Short: "Archive He3DB Xlog KV", // 当前命令的简短描述
Long: "Welcome to use hbr for He3DB xlog archive", // 当前命令的完整描述
Run: runArchive, // 属性是一个函数,当执行命令时会调用此函数
}
func init() {
rootCmd.AddCommand(archiveCmd) // 添加archive命令
}
func runArchive(cmd *cobra.Command, args []string) {
var sem = make(chan bool, concurrency)
archiveStart := time.Now()
access_key, _ := cmd.Flags().GetString("access_key") // s3的access_key
secret_key, _ := cmd.Flags().GetString("secret_key") // s3的secret_key
endpoint, _ := cmd.Flags().GetString("endpoint") // s3的访问地址
region, _ := cmd.Flags().GetString("region") // s3的region
bucket, _ := cmd.Flags().GetString("bucket") // s3 桶名称
pd, _ := cmd.Flags().GetString("pd") // tikv的pd地址
backup_name, _ := cmd.Flags().GetString("name")
archive_start_time_line, _ := cmd.Flags().GetString("archive_start_time_line")
archive_start_lsn, _ := cmd.Flags().GetString("archive_start_lsn")
if access_key == "" || secret_key == "" || endpoint == "" || region == "" || bucket == "" || pd == "" || backup_name == "" || archive_start_time_line == "" || archive_start_lsn == "" {
fmt.Printf("PARAMETER ERROR!\n")
return
}
client, err := tikv.NewRawKVClient([]string{pd}, config.Security{}) // tikv go client,访问tikv用
if err != nil {
fmt.Printf("Connect Tikv Error!\n%v\n", err)
return
}
// The session the S3 Uploader will use
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""),
S3ForcePathStyle: aws.Bool(true),
})
if err != nil {
fmt.Printf("Connect S3 Error!\n%v\n", err)
return
}
s3_client := s3.New(sess) // New creates a new instance of the S3 client with a session
var filename string = ""
wlCount := 0
// archive wal kv
fmt.Printf("archive wal kv!\n")
for id := 0; id < 8; id++ {
//06000000000000000100000000000000070000000000000000
//因为加了个id字段,目前不能跨时间线备份
retStartString := fmt.Sprintf("06%s000000000000000%d%s", archive_start_time_line, id, archive_start_lsn)
//retEndString := fmt.Sprintf("06ffffffffffffffff000000000000000%dffffffffffffffff", id)
retEndString := fmt.Sprintf("06%s000000000000000%dffffffffffffffff", archive_start_time_line, id)
retStart := make([]byte, 25)
retEnd := make([]byte, 25)
index := 0
for i := 0; i < len(retStartString); i += 2 {
value, _ := strconv.ParseUint(retStartString[i:i+2], 16, 8)
retStart[index] = byte(0xff & value)
value, _ = strconv.ParseUint(retEndString[i:i+2], 16, 8)
retEnd[index] = byte(0xff & value)
index++
}
fmt.Printf("%x\n", retStart)
fmt.Printf("%x\n", retEnd)
limit := 10240
for {
keys, values, _ := client.Scan(retStart, retEnd, limit) // 从tikv中读日志
for k, _ := range keys {
fmt.Printf("%x\n", keys[k])
filename = fmt.Sprintf("%x", keys[k])
wg.Add(1)
sem <- true
go s3PutKV(s3_client, bucket, backup_name, filename, values[k], sem) // 调用s3 api接口,将日志存到s3中
if bytes.Compare(retStart, keys[k]) < 0 {
retStart = keys[k]
}
wlCount++
}
if len(keys) < limit {
break
}
wlCount--
}
}
wg.Wait()
client.Close()
fmt.Printf("wal kv count:%v\n", wlCount)
fmt.Println("backup time:", time.Since(archiveStart))
}
// 存数据到s3
func s3PutKV(s3_client *s3.S3, bucket string, backup_name string, filename string, v []byte, sem chan bool) {
defer wg.Done()
defer func() {
<-sem
}()
_, err := s3_client.PutObject(&s3.PutObjectInput{ // Adds an object to a bucket.
Bucket: aws.String(bucket), // 存到那个桶中
Key: aws.String(backup_name + "/" + filename), // key的构造:backup_name + filename
Body: bytes.NewReader(v),
})
if err != nil {
fmt.Printf("S3 PutObject Error!\n%v\n", err)
os.Exit(1)
}
//fmt.Printf("S3 PutObject!\n")
}
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




