
BitSail是字节跳动自研的数据集成产品,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下全域数据集成解决方案。本系列聚焦BitSail Connector开发模块,为大家带来详细全面的开发方法与场景示例,本篇将主要介绍SourceSplitCoordinator接口部分。

文 | 浩宇 来自字节跳动数据平台BitSail团队
持续关注,本开发详解将分为四篇呈现。
● 开发详解系列二:SourceSplitCoordinator
● 开发详解系列三:SourceReader
● 开发详解系列四:Sink、Writer
Source Connector

本文将主要介绍创建、管理Split的角色SplitCoordinator。
SourceSplitCoordinator
大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的Split,SplitCoordinator承担这个创建、管理Split的角色。


public interface SourceSplitCoordinator<SplitT extends SourceSplit, StateT> extends Serializable, AutoCloseable {void start();void addReader(int subtaskId);void addSplitsBack(List<SplitT> splits, int subtaskId);void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}StateT snapshotState() throws Exception;default void notifyCheckpointComplete(long checkpointId) throws Exception {}void close();interface Context<SplitT extends SourceSplit, StateT> {boolean isRestored();*** Return the state to the split coordinator, for the exactly-once.*/StateT getRestoreState();*** Return total parallelism of the source reader.*/int totalParallelism();*** When Source reader started, it will be registered itself to coordinator.*/Set<Integer> registeredReaders();*** Assign splits to reader.*/void assignSplit(int subtaskId, List<SplitT> splits);*** Mainly use in boundedness situation, represents there will no more split will send to source reader.*/void signalNoMoreSplits(int subtask);*** If split coordinator have any event want to send source reader, use this method.* Like send Pause event to Source Reader in CDC2.0.*/void sendEventToSourceReader(int subtaskId, SourceEvent event);*** Schedule to run the callable and handler, often used in un-boundedness mode.*/<T> void runAsync(Callable<T> callable,BiConsumer<T, Throwable> handler,int initialDelay,long interval);*** Just run callable and handler once, often used in boundedness mode.*/<T> void runAsyncOnce(Callable<T> callable,BiConsumer<T, Throwable> handler);}}

开发者在构造方法中一般主要进行一些配置的设置和分片信息存储的容器的创建。
以ClickhouseSourceSplitCoordinator的构造为例:
public ClickhouseSourceSplitCoordinator(SourceSplitCoordinator.Context<ClickhouseSourceSplit, EmptyState> context,BitSailConfiguration jobConf) {this.context = context;this.jobConf = jobConf;this.splitAssignmentPlan = Maps.newConcurrentMap();}
在自定义了State的场景中,需要对checkpoint时存储在SourceSplitCoordinator.Context的状态进行保存和恢复。
以RocketMQSourceSplitCoordinator为例:
public RocketMQSourceSplitCoordinator(SourceSplitCoordinator.Context<RocketMQSplit, RocketMQState> context,BitSailConfiguration jobConfiguration,Boundedness boundedness) {this.context = context;this.jobConfiguration = jobConfiguration;this.boundedness = boundedness;this.discoveryInternal = jobConfiguration.get(RocketMQSourceOptions.DISCOVERY_INTERNAL);this.pendingRocketMQSplitAssignment = Maps.newConcurrentMap();this.discoveredPartitions = new HashSet<>();if (context.isRestored()) {RocketMQState restoreState = context.getRestoreState();assignedPartitions = restoreState.getAssignedWithSplits();discoveredPartitions.addAll(assignedPartitions.keySet());} else {assignedPartitions = Maps.newHashMap();}prepareConsumerProperties();}

进行一些数据源所需分片元数据的提取工作,如果有抽象出来的Split Assigner类,一般在这里进行初始化。如果使用的是封装的Split Assign函数,这里会进行待分配切片的初始化工作。
/ 流批一体场景
以RocketMQSourceSplitCoordinator为例:
private void prepareRocketMQConsumer() {try {consumer = RocketMQUtils.prepareRocketMQConsumer(jobConfiguration,String.format(COORDINATOR_INSTANCE_NAME_TEMPLATE,cluster, topic, consumerGroup, UUID.randomUUID()));consumer.start();} catch (Exception e) {throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);}}@Overridepublic void start() {prepareRocketMQConsumer();splitAssigner = new FairRocketMQSplitAssigner(jobConfiguration, assignedPartitions);if (discoveryInternal > 0) {context.runAsync(this::fetchMessageQueues,this::handleMessageQueueChanged,0,discoveryInternal);} else {context.runAsyncOnce(this::fetchMessageQueues,this::handleMessageQueueChanged);}}
/ 批式场景
以ClickhouseSourceSplitCoordinator为例:
public void start() {List<ClickhouseSourceSplit> splitList;try {SimpleDivideSplitConstructor constructor = new SimpleDivideSplitConstructor(jobConf);splitList = constructor.construct();} catch (IOException e) {ClickhouseSourceSplit split = new ClickhouseSourceSplit(0);split.setReadTable(true);splitList = Collections.singletonList(split);LOG.error("Failed to construct splits, will directly read the table.", e);}int readerNum = context.totalParallelism();LOG.info("Found {} readers and {} splits.", readerNum, splitList.size());if (readerNum > splitList.size()) {LOG.error("Reader number {} is larger than split number {}.", readerNum, splitList.size());}for (ClickhouseSourceSplit split : splitList) {int readerIndex = ReaderSelector.getReaderIndex(readerNum);splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex);}}

将划分好的切片分配给Reader,开发过程中,我们通常让SourceSplitCoordinator专注于处理和Reader 的通讯工作,实际split的分发逻辑一般封装在Assigner进行,这个Assigner可以是一个封装的Split Assign函数,也可以是一个抽象出来的Split Assigner类。
/ Assign函数示例
以ClickhouseSourceSplitCoordinator为例:
tryAssignSplitsToReader函数将存储在splitAssignmentPlan中的划分好的切片分配给相应的Reader。
private void tryAssignSplitsToReader() {Map<Integer, List<ClickhouseSourceSplit>> splitsToAssign = new HashMap<>();for (Integer readerIndex : splitAssignmentPlan.keySet()) {if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) && context.registeredReaders().contains(readerIndex)) {splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex)));}}for (Integer readerIndex : splitsToAssign.keySet()) {LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex,splitsToAssign.get(readerIndex).stream().map(ClickhouseSourceSplit::uniqSplitId).collect(Collectors.toList()));splitAssignmentPlan.remove(readerIndex);context.assignSplit(readerIndex, splitsToAssign.get(readerIndex));context.signalNoMoreSplits(readerIndex);LOG.info("Finish assigning splits reader {}", readerIndex);}}
/ Assigner方法示例
以RocketMQSourceSplitCoordinator为例:
public class FairRocketMQSplitAssigner implements SplitAssigner<MessageQueue> {private BitSailConfiguration readerConfiguration;private AtomicInteger atomicInteger;public Map<MessageQueue, String> rocketMQSplitIncrementMapping;public FairRocketMQSplitAssigner(BitSailConfiguration readerConfiguration,Map<MessageQueue, String> rocketMQSplitIncrementMapping) {this.readerConfiguration = readerConfiguration;this.rocketMQSplitIncrementMapping = rocketMQSplitIncrementMapping;this.atomicInteger = new AtomicInteger(CollectionUtils.size(rocketMQSplitIncrementMapping.keySet()));}@Overridepublic String assignSplitId(MessageQueue messageQueue) {if (!rocketMQSplitIncrementMapping.containsKey(messageQueue)) {rocketMQSplitIncrementMapping.put(messageQueue, String.valueOf(atomicInteger.getAndIncrement()));}return rocketMQSplitIncrementMapping.get(messageQueue);}@Overridepublic int assignToReader(String splitId, int totalParallelism) {return splitId.hashCode() % totalParallelism;}}

调用Assigner,为Reader添加切片。
/ 批式场景示例
以ClickhouseSourceSplitCoordinator为例:
public void addReader(int subtaskId) {LOG.info("Found reader {}", subtaskId);tryAssignSplitsToReader();}
/ 流批一体场景示例
以RocketMQSourceSplitCoordinator为例:
private void notifyReaderAssignmentResult() {Map<Integer, List<RocketMQSplit>> tmpRocketMQSplitAssignments = new HashMap<>();for (Integer pendingAssignmentReader : pendingRocketMQSplitAssignment.keySet()) {if (CollectionUtils.isNotEmpty(pendingRocketMQSplitAssignment.get(pendingAssignmentReader))&& context.registeredReaders().contains(pendingAssignmentReader)) {tmpRocketMQSplitAssignments.put(pendingAssignmentReader, Lists.newArrayList(pendingRocketMQSplitAssignment.get(pendingAssignmentReader)));}}for (Integer pendingAssignmentReader : tmpRocketMQSplitAssignments.keySet()) {LOG.info("Assigning splits to reader {}, splits = {}.", pendingAssignmentReader,tmpRocketMQSplitAssignments.get(pendingAssignmentReader));context.assignSplit(pendingAssignmentReader,tmpRocketMQSplitAssignments.get(pendingAssignmentReader));Set<RocketMQSplit> removes = pendingRocketMQSplitAssignment.remove(pendingAssignmentReader);removes.forEach(removeSplit -> {assignedPartitions.put(removeSplit.getMessageQueue(), removeSplit.getSplitId());});LOG.info("Assigned splits to reader {}", pendingAssignmentReader);if (Boundedness.BOUNDEDNESS == boundedness) {LOG.info("Signal reader {} no more splits assigned in future.", pendingAssignmentReader);context.signalNoMoreSplits(pendingAssignmentReader);}}}@Overridepublic void addReader(int subtaskId) {LOG.info("Adding reader {} to RocketMQ Split Coordinator for consumer group {}.",subtaskId,consumerGroup);notifyReaderAssignmentResult();}

对于一些Reader没有处理完的切片,进行重新分配,重新分配的策略可以自己定义,常用的策略是哈希取模,对于返回的Split列表中的所有Split进行重新分配后再Assign给不同的Reader。
/ 批式场景示例 /
以ClickhouseSourceSplitCoordinator为例:
ReaderSelector使用哈希取模的策略对Split列表进行重分配。
tryAssignSplitsToReader方法将重分配后的Split集合通过Assigner分配给Reader。
public void addSplitsBack(List<ClickhouseSourceSplit> splits, int subtaskId) {LOG.info("Source reader {} return splits {}.", subtaskId, splits);int readerNum = context.totalParallelism();for (ClickhouseSourceSplit split : splits) {int readerIndex = ReaderSelector.getReaderIndex(readerNum);splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex);}tryAssignSplitsToReader();}
/ 流批一体场景示例 /
以RocketMQSourceSplitCoordinator为例:
addSplitChangeToPendingAssignment使用哈希取模的策略对Split列表进行重分配。
notifyReaderAssignmentResult将重分配后的Split集合通过Assigner分配给Reader。
private synchronized void addSplitChangeToPendingAssignment(Set<RocketMQSplit> newRocketMQSplits) {int numReader = context.totalParallelism();for (RocketMQSplit split : newRocketMQSplits) {int readerIndex = splitAssigner.assignToReader(split.getSplitId(), numReader);pendingRocketMQSplitAssignment.computeIfAbsent(readerIndex, r -> new HashSet<>()).add(split);}LOG.debug("RocketMQ splits {} finished assignment.", newRocketMQSplits);}@Overridepublic void addSplitsBack(List<RocketMQSplit> splits, int subtaskId) {LOG.info("Source reader {} return splits {}.", subtaskId, splits);addSplitChangeToPendingAssignment(new HashSet<>(splits));notifyReaderAssignmentResult();}

存储处理切片的快照信息,用于恢复时在构造方法中使用。
public RocketMQState snapshotState() throws Exception {return new RocketMQState(assignedPartitions);}

关闭在分片过程中与数据源交互读取元数据信息的所有未关闭连接器。
public void close() {if (consumer != null) {consumer.shutdown();}}
产品介绍
BitSail支持20多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下的全域数据集成解决方案,目前服务于字节内部几乎所有业务线,包括抖音、今日头条等大家耳熟能详的应用,同时也支撑了火山引擎多个客户的数据集成需求。后台回复数字“12”了解更多信息。










