CREATE TABLE {teble} ON CLUSTER {cluster}AS {local_table}ENGINE= Distributed({cluster}, {database}, {local_table},{policy})

Distributed表 Insert 原理
本文主要是对Distributed表如何写入及如何分发做一下分析,略过SQL的词法解析、语法解析等步骤,从写入流开始,其构造方法如下:
DistributedBlockOutputStream(const Context & context_, StorageDistributed &storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, boolinsert_sync_, UInt64 insert_timeout_);







可以发现每个shard对应的目录名是{darabse}@{hostname}:{tcpPort}的格式,如果多个副本会用,分隔。并且每个shard目录中还有个tmp目录,这个目录的设计在writeToShard()方法中做了解释,是为了避免数据文件在没写完就被发送到远端。
数据文件在本地写入的过程中会先写入tmp路径中,写完后通过硬链接link到shard目录,保证只要在shard目录中出现的数据文件都是完整写入的数据文件。
数据文件的命名是通过全局递增的数字加.bin命名,是为了在后续分发到远端节点保持顺序性。
5、数据如何分发到各个节点
细心的你可能已经发现在writeToShard()方法中有个requireDirectoryMonitor(),这个方法就是将shard目录注册监听,并通过专用类StorageDistributedDirectoryMonitor来实现数据文件的分发,根据不同配置可以实现逐一分发或批量分发。并且包含对坏文件的容错处理。



sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context,getColumns().getAllPhysical(), false);
const ExpressionActionsPtr & getShardingKeyExpr() const { returnsharding_key_expr; }
所以说sharding_key_expr最终主要就是由sharding_key决定的。
一般情况下getShardingKeyExpr()方法都为true,如果再满足shard数量大于1,就会对block进行拆分,由splitBlock()方法主要逻辑就是创建selector并使用selector进行切割,大致逻辑如下:




Distributed表在写入时会在本地节点生成临时数据,会产生写放大,所以会对CPU及内存造成一些额外消耗,建议尽量少使用Distributed表进行写操作; Distributed表写的临时block会把原始block根据sharding_key和weight进行再次拆分,会产生更多的block分发到远端节点,也增加了merge的负担; Distributed表如果是基于表函数创建的,一般是同步写,需要注意。
Distributed表 Select 流程



void SelectStreamFactory::createForShard(const Cluster::ShardInfo & shard_info,const String & query, const ASTPtr & query_ast,const Context & context, const ThrottlerPtr & throttler,BlockInputStreams & res){// 构造一个本地流方法auto emplace_local_stream = [&](){res.emplace_back(createLocalStream(query_ast, context, processed_stage));};// 构造一个远程流方法auto emplace_remote_stream = [&](){auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);stream->setPoolMode(PoolMode::GET_MANY);if (!table_func_ptr)stream->setMainTable(main_table);res.emplace_back(std::move(stream));};// 获取settings配置const auto & settings = context.getSettingsRef();// prefer_localhost_replica默认为true,如果shard_info还是本地分片,进入以下逻辑if (settings.prefer_localhost_replica && shard_info.isLocal()){StoragePtr main_table_storage;// 根据是不是表函数方式使用不同逻辑获取main_table_storage,即一个IStorage// 其中表函数是指那些虚的表引擎,例如file、merge、remote、url、mysql这一类的// 因为这一类是不在本地存放数据的,需要另一种验证库表存在的方式if (table_func_ptr){const auto * table_function = table_func_ptr->as<ASTFunction>();TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());}elsemain_table_storage = context.tryGetTable(main_table.database, main_table.table);// 如果main_table_storage不存在,就尝试去其他server获取if (!main_table_storage){ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);if (shard_info.hasRemoteConnections()){LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"There is no table " << main_table.database << "." << main_table.table<< " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");// 这里就是应用上面声明的匿名方法,使用远程流emplace_remote_stream();}else// 同理,使用本地流// 当然这里已经没有获取到本地StoragePtr,所以也一定会失败emplace_local_stream();return;}// 尝试将main_table_storage指针转为replicated*类型指针const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());// 如果不是ReplicatedMergeTree引擎表,使用本地server,如果是就要考虑各个副本的// 延迟情况,如果延迟不满足会在去寻找其他副本if (!replicated_storage){emplace_local_stream();return;}UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;// 如果没设置最大延迟,依旧选择本地副本查询if (!max_allowed_delay){emplace_local_stream();return;}UInt32 local_delay = replicated_storage->getAbsoluteDelay();// 如果设置了最大延迟且本地延迟小于最大延迟,本地副本依然有效,选择本地副本// 这里的本地延迟时间是每次查询时不断更新的// 包含min_unprocessed_insert_time和max_processed_insert_time// 这两个值在zk的tables/{num}/replicas/{hostname}/路径下记录的// 这里的用意也是为了间接判断本机的负载情况if (local_delay < max_allowed_delay){emplace_local_stream();return;}// 如果以上逻辑都没有进入,说明本地延迟已经大于设置的最大延迟参数了,会执行以下代码ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay << "s.)");// 如果没有这是fallback,就不能使用本地副本,去尝试获取远程副本if (!settings.fallback_to_stale_replicas_for_distributed_queries){if (shard_info.hasRemoteConnections()){emplace_remote_stream();return;}elsethrow Exception("Local replica of shard " + toString(shard_info.shard_num)+ " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",ErrorCodes::ALL_REPLICAS_ARE_STALE);}// 如果没有远程副本可选,而且设置了fallback,则才会选择本地副本if (!shard_info.hasRemoteConnections()){emplace_local_stream();return;}// 构造lazily_create_stream方法,避免在主线程中进行连接auto lazily_create_stream = [pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,local_delay]()-> BlockInputStreamPtr{auto current_settings = context.getSettingsRef();auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time);std::vector<ConnectionPoolWithFailover::TryResult> try_results;try{// 这里会去连接池中获取远端的entry,entry中包含着请求其他server的必要信息// getManyForTableFunction和getManyChecked方法最后都会调用getManyImpl// 只不过传入的TryGetEntry不同,在tryGetEntry中会去检查远端server的表的状态// 并且检查远端server中分布式表所使用到的表的延迟情况,// 以is_up_to_date(bool)表示if (table_func_ptr)try_results = pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY);elsetry_results = pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table);}catch (const Exception & ex){if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"Connections to remote replicas of local shard " << shard_num << " failed, will use stale local replica");elsethrow;}double max_remote_delay = 0.0;for (const auto & try_result : try_results){if (!try_result.is_up_to_date)max_remote_delay = std::max(try_result.staleness, max_remote_delay);}// 下面是将得到的result进行聚合if (try_results.empty() || local_delay < max_remote_delay)return createLocalStream(query_ast, context, stage);else{std::vector<IConnectionPool::Entry> connections;connections.reserve(try_results.size());for (auto & try_result : try_results)connections.emplace_back(std::move(try_result.entry));return std::make_shared<RemoteBlockInputStream>(std::move(connections), query, header, context, nullptr, throttler, external_tables, stage);}};res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream));}elseemplace_remote_stream();}

PoolWithFailoverBase<TNestedPool>::getMany(size_t min_entries, size_t max_entries, size_t max_tries,const TryGetEntryFunc & try_get_entry,const GetPriorityFunc & get_priority,bool fallback_to_stale_replicas){......std::string fail_messages;bool finished = false;while (!finished){for (size_t i = 0; i < shuffled_pools.size(); ++i){if (up_to_date_count >= max_entries|| entries_count + failed_pools_count >= nested_pools.size()){finished = true;break;}ShuffledPool & shuffled_pool = shuffled_pools[i];TryResult & result = try_results[i];if (shuffled_pool.error_count >= max_tries || !result.entry.isNull())continue;std::string fail_message;// 这里就是调用了上面提到的TryGetEntryFunc方法来真正的获取entryresult = try_get_entry(*shuffled_pool.pool, fail_message);if (!fail_message.empty())fail_messages += fail_message + '\n';if (!result.entry.isNull()){++entries_count;if (result.is_usable){++usable_count;if (result.is_up_to_date)++up_to_date_count;}}else{LOG_WARNING(log, "Connection failed at try №"<< (shuffled_pool.error_count + 1) << ", reason: " << fail_message);ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);shuffled_pool.error_count = std::min(max_error_cap, shuffled_pool.error_count + 1);if (shuffled_pool.error_count >= max_tries){++failed_pools_count;ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);}}}}if (usable_count < min_entries)throw DB::NetException("All connection tries failed. Log: \n\n" + fail_messages + "\n",DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);try_results.erase(std::remove_if(try_results.begin(), try_results.end(),[](const TryResult & r) { return r.entry.isNull() || !r.is_usable; }),try_results.end());// 以下代码主要是对结果进行排序std::stable_sort(try_results.begin(), try_results.end(),[](const TryResult & left, const TryResult & right){return std::forward_as_tuple(!left.is_up_to_date, left.staleness)< std::forward_as_tuple(!right.is_up_to_date, right.staleness);});......return try_results;}
近期文章推荐:
更多精彩内容欢迎关注公众号

文章转载自 ClickHouse周边,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




