上篇文章 介绍了用于将 binlog 同步到 MySQL TiDB 的 Loader package,本文往回退一步,介绍 Drainer 同步到不同下游的机制。
TiDB Binlog(github.com/pingcap/tidb-binlog)用于收集 TiDB 的 binlog,并准实时同步给下游。同步数据这一步重要操作由 Drainer 模块支持,它可以将 binlog 同步到 TiDB MySQL Kafka File (增量备份)等下游组件。
对于 TiDB 和 MySQL 两种类型的下游组件,Drainer 会从 binlog 中还原出对应的 SQL 操作在下游直接执行; 对于 Kafka 和 File(增量备份)两种类型的下游组件,输出约定编码格式的 binlog。用户可以定制后续各种处理流程,如更新搜索引擎索引、清除缓存、增量备份等。TiDB Binlog 自带工具 Reparo 实现了将增量备份数据(下游类型为 File(增量备份))同步到 TiDB MySQL 的功能。
本文将按以下几个小节介绍 Drainer 如何将收到的 binlog 同步到下游:
Drainer Sync 模块
Syncer
// Syncer sync binlog item to downstream
type Syncer interface {
Sync the binlog item to downstream
Sync(item *Item) error
will be close if Close normally or meet error, call Error() to check it
Successes() <-chan *Item
Return not nil if fail to sync data to downstream or nil if closed normally
Error() <-chan error
Close the Syncer, no more item can be added by `Sync`
Close() error
}
Checkpoint
type CheckPoint interface {
Load loads checkpoint information.
Load() error
Save saves checkpoint information.
Save(int64) error
Pos gets position information.
TS() int64
Close closes the CheckPoint and release resources, after closed other methods should not be called again.
Close() error
}
Translator
// Txn holds transaction info, an DDL or DML sequences
type Txn struct {
DMLs []*DML
DDL *DDL
This field is used to hold arbitrary data you wish to include so it
will be available when receiving on the Successes channel
Metadata interface{}
}
Schema
func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
var i int
for i = 0; i < len(s.jobs); i++ {
if s.jobs[i].BinlogInfo.SchemaVersion <= version {
_, _, _, err := s.handleDDL(s.jobs[i])
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}
} else {
break
}
}
s.jobs = s.jobs[i:]
return nil
}
恢复工具
读取 binlog
// BinlogName creates a binlog file name. The file name format is like binlog-0000000000000001-20181010101010
func BinlogName(index uint64) string {
currentTime := time.Now()
return binlogNameWithDateTime(index, currentTime)
}
// binlogNameWithDateTime creates a binlog file name.
func binlogNameWithDateTime(index uint64, datetime time.Time) string {
return fmt.Sprintf("binlog-%016d-%s", index, datetime.Format(datetimeFormat))
}
// ReadDir reads and returns all file and dir names from directory
func ReadDir(dirpath string) ([]string, error) {
dir, err := os.Open(dirpath)
if err != nil {
return nil, errors.Trace(err)
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return nil, errors.Annotatef(err, "dir %s", dirpath)
}
sort.Strings(names)
return names, nil
}
func Decode(r io.Reader) (*pb.Binlog, int64, error) {
payload, length, err := binlogfile.Decode(r)
if err != nil {
return nil, 0, errors.Trace(err)
}
binlog := &pb.Binlog{}
err = binlog.Unmarshal(payload)
if err != nil {
return nil, 0, errors.Trace(err)
}
return binlog, length, nil
}
写入 TiDB
小结
TiDB Binlog 源码阅读系列在此就全部完结了
相信大家通过本系列文章更全面地理解了 TiDB Binlog 的原理和实现细节。我们将继续打磨优化,欢迎大家给我们反馈使用过程中遇到的问题或建议;如果社区小伙伴们想参与 TiDB Binlog 的设计、开发和测试,也欢迎与我们联系 info@pingcap.com,或者在 Repo 中 提 issue 讨论。
(七)Drainer server 介绍
TiDB Binlog(github.com/pingcap/tidb-binlog)组件用于收集 TiDB 的 binlog,并准实时同步给下游,如 TiDB、MySQL 等。该组件在功能上类似于 MySQL 的主从复制,会收集各个 TiDB 实例产生的 binlog,并按事务提交的时间排序,全局有序的将数据同步至下游。利用 TiDB Binlog 可以实现数据准实时同步到其他数据库,以及 TiDB 数据准实时的备份与恢复。我们希望通过《TiDB Binlog 源码阅读系列文章》帮助大家理解和掌握这个项目,也有助于我们和社区共同进行 TiDB Binlog 的设计、开发和测试。

文章转载自PingCAP,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




