暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

Distributed表引擎Insert和Select流程

ClickHouse周边 2021-06-04
1138
        ClickHouse依靠Distributed引擎实现了Distributed(分布式)表机制,在所有分片(本地表)上建立视图进行分布式查询,。它是一种特殊的表引擎,自身不会存储任何数据,而是通过读取或写入其他远端节点上的表进行数据处理的表引擎。该表引擎需要依赖各个节点的本地表来创建,本地表的存在是Distributed表创建的依赖条件,创建语句如下:
    CREATE TABLE {teble} ON CLUSTER {cluster}
    AS {local_table}
    ENGINE= Distributed({cluster}, {database}, {local_table},{policy})
        这里的policy一般可以使用随机(例如rand())或哈希(例如halfMD5hash(id))。
            再来看下ClickHouse集群节点配置文件,相关参数如下:
    • Distributed表 Insert 原理

            本文主要是对Distributed表如何写入及如何分发做一下分析,略过SQL的词法解析、语法解析等步骤,从写入流开始,其构造方法如下:

      DistributedBlockOutputStream(const Context & context_, StorageDistributed &
      storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, bool
      insert_sync_, UInt64 insert_timeout_);
              如果insert_sync_为true,表示是同步写入,并配合insert_timeout_参数使用(insert_timeout_为零表示没有超时时间);如果insert_sync_为false,表示写入是异步。
      1、同步写入还是异步写入
              同步写入是指数据直写入实际的表中,而异步写入是指数据首先被写入本地文件系统,然后发送到远端节点。
              是否执行同步写入是由insert_sync决定的,最终是由是否配置insert_distributed_sync(默认为false)和owned_cluster值的或关系决定的,一般在使用MergeTree之类的普通表引擎时,通常是异步写入,但在使用表函数时(使用owned_cluster来判断是否是表函数),通常会使用同步写入。这也是在设计业务逻辑时需要注意的。
              owned_cluster是什么时候赋值的呢?
              可以发现在创建remote表时会根据remote_table_function_ptr参数对最终的owned_cluster_赋值为true。
      2、异步写入是如何实现的
              了解了什么时候使用同步写入什么时候异步写入后,再继续分析正式的写入过程,同步写入一般场景中涉及较少,这里主要对异步写入逻辑进行分析。outStream的write方法主逻辑如下:
              其实这个write方法是重写了virtual void IBlockOutputStream::write(const Block & block),所以节点在接收到流并调用流的write方法就会进入该逻辑中。并且根据insert_sync来决定走同步写还是异步写。
      3、写入本地节点还是远端节点
              主要还是对异步写入进行分析,其实writeAsync()最终的实现方法是writeAsyncImpl(),大致逻辑图如下:
              其中getShardsInfo()方法就是获取config.xml配置文件中获取集群节点信息,hasInternalReplication()就对应着配置文件中的internal_replication参数,如果为true,就会进入最外层的if逻辑,否则就会进入else逻辑。
              其中writeToLocal()方法是相同的,是指如果shard包含本地节点,优先选择本地节点进行写入;后半部分writeToShard()就是根据internal_replication参数的取值来决定是写入其中一个远端节点,还是所有远端节点都写一次。
      4、数据如何写入本地节点
              当然一般情况Distributed表还是基于ReplicatedMergeTree系列表进行创建,而不是基于表函数的,所以大多数场景还是会先写入本地再分发到远端节点。那写入Distributed表的数据是如何保证原子性落盘而不会在数据正在写入的过程中就把不完整的数据发送给远端其他节点呢?看下writeToShard()方法大致逻辑,如下:
              继续具体再看下源码的具体实现,如下:
              首先来了解下Distributed表在目录中的存储方式,默认位置都是/var/lib/clickhouse/data/{database}/{table}/在该目录下会为每个shard生成不同的目录,其中存放需要发送给该shard的数据文件,例如:

              可以发现每个shard对应的目录名是{darabse}@{hostname}:{tcpPort}的格式,如果多个副本会用,分隔。并且每个shard目录中还有个tmp目录,这个目录的设计在writeToShard()方法中做了解释,是为了避免数据文件在没写完就被发送到远端。

              数据文件在本地写入的过程中会先写入tmp路径中,写完后通过硬链接link到shard目录,保证只要在shard目录中出现的数据文件都是完整写入的数据文件。

              数据文件的命名是通过全局递增的数字加.bin命名,是为了在后续分发到远端节点保持顺序性。

      5、数据如何分发到各个节点

              细心的你可能已经发现在writeToShard()方法中有个requireDirectoryMonitor(),这个方法就是将shard目录注册监听,并通过专用类StorageDistributedDirectoryMonitor来实现数据文件的分发,根据不同配置可以实现逐一分发或批量分发。并且包含对坏文件的容错处理。

      分析到这,可能还有人会觉得云里雾里,觉得整个流程串不起来,其实这样写是为了先不影响Distributed表写入的主流程,明白了这个再附加上sharding_key拆分和权重拆分就很好理解了。

      上面提到过writeAsync()的最终实现方法是writeAsyncImpl,这个说法是没问题的,但是中间还有段关键逻辑,如下:
      getShardingKeyExpr()方法就是去获取sharding_key生成的表达式指针,该表达式是在创建表时就生成的,如下:
        sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context,
        getColumns().getAllPhysical(), false);
        那sharding_key和sharding_key_expr是什么关系呢?如下:
          const ExpressionActionsPtr & getShardingKeyExpr() const { return 
          sharding_key_expr; }

          所以说sharding_key_expr最终主要就是由sharding_key决定的。

          一般情况下getShardingKeyExpr()方法都为true,如果再满足shard数量大于1,就会对block进行拆分,由splitBlock()方法主要逻辑就是创建selector并使用selector进行切割,大致逻辑如下:

                  对于如何创建selector以及selector中都做了什么事儿,来具体看下源码截取,如下:

                  看splitBlock()方法,ClickHouse是利用createSelector()方法构造selector来进行后续的处理。在createSelector()方法中最重要的就是key_column和slot_to_shard。
                  key_column是通过sharding_key间接获得的,是为了根据主键列进行切割;slot_to_shard是shard插槽,这里就是为了处理权重,在后续向插槽中插入数据时就会结合config.xml中的weight进行按比例处理。
                  细节比较复杂这里不做太细致的分析,有兴趣可以自行看下(如template IColumn::Selector createBlockSelector())。
                  到此,对于Distributed表的写入流程的关键点就大致分析完了。篇幅有限有些细节没有做过多说明,有兴趣的可以自行再了解下。

          通过对Distributed表写入流程的分析,了解了该类型表的实际工作原理,所以在实际应用中有几个点还需要关注一下:
          1. Distributed表在写入时会在本地节点生成临时数据,会产生写放大,所以会对CPU及内存造成一些额外消耗,建议尽量少使用Distributed表进行写操作;
          2. Distributed表写的临时block会把原始block根据sharding_key和weight进行再次拆分,会产生更多的block分发到远端节点,也增加了merge的负担;
          3. Distributed表如果是基于表函数创建的,一般是同步写,需要注意。
          • Distributed表 Select 流程

                  Distributed表引擎不会真实存储数据,是ClickHouse提供的一个分布式查询引擎,其查询原理大致概括起来就是将server端接收到的查询请求进行重写,并发送到指定的多个server端去执行查询,最终由接到请求的server端进行汇总,最后返回给client端。这个过程可以通过源码来更清晰的了解以下。
                  首先,从BlockInputStreams StorageDistributed::read方法说起,因为从InterpreterSelectQuery*这类的查询都会调用BlockInputStreams 类型的read方法。
                  read方法主要是sql重写及根据表函数及库表的不同逻辑初始化SelectStreamFactory,executeQuery方法是查询的入口。

                  executeQuery方法主要是修改和设置一些配置,接下来是stream_factory的创建了,createForShard是个虚函数,具体实现如下:
            void SelectStreamFactory::createForShard(
            const Cluster::ShardInfo & shard_info,
            const String & query, const ASTPtr & query_ast,
            const Context & context, const ThrottlerPtr & throttler,
            BlockInputStreams & res)
            {
            // 构造一个本地流方法
            auto emplace_local_stream = [&]()
            {
            res.emplace_back(createLocalStream(query_ast, context, processed_stage));
            };

            // 构造一个远程流方法
            auto emplace_remote_stream = [&]()
            {
            auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
            stream->setPoolMode(PoolMode::GET_MANY);
            if (!table_func_ptr)
            stream->setMainTable(main_table);
            res.emplace_back(std::move(stream));
            };

            // 获取settings配置
            const auto & settings = context.getSettingsRef();

            // prefer_localhost_replica默认为true,如果shard_info还是本地分片,进入以下逻辑
            if (settings.prefer_localhost_replica && shard_info.isLocal())
            {
            StoragePtr main_table_storage;

            // 根据是不是表函数方式使用不同逻辑获取main_table_storage,即一个IStorage
            // 其中表函数是指那些虚的表引擎,例如file、merge、remote、url、mysql这一类的
            // 因为这一类是不在本地存放数据的,需要另一种验证库表存在的方式
            if (table_func_ptr)
            {
            const auto * table_function = table_func_ptr->as<ASTFunction>();
            TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
            main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
            }
            else
            main_table_storage = context.tryGetTable(main_table.database, main_table.table);


            // 如果main_table_storage不存在,就尝试去其他server获取
            if (!main_table_storage)
            {
            ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
            if (shard_info.hasRemoteConnections())
            {
            LOG_WARNING(
            &Logger::get("ClusterProxy::SelectStreamFactory"),
            "There is no table " << main_table.database << "." << main_table.table
            << " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");
            // 这里就是应用上面声明的匿名方法,使用远程流
            emplace_remote_stream();
            }
            else
            // 同理,使用本地流
            // 当然这里已经没有获取到本地StoragePtr,所以也一定会失败
            emplace_local_stream();

            return;
            }

            // 尝试将main_table_storage指针转为replicated*类型指针
            const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());

            // 如果不是ReplicatedMergeTree引擎表,使用本地server,如果是就要考虑各个副本的
            // 延迟情况,如果延迟不满足会在去寻找其他副本
            if (!replicated_storage)
            {
            emplace_local_stream();
            return;
            }

            UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;

            // 如果没设置最大延迟,依旧选择本地副本查询
            if (!max_allowed_delay)
            {
            emplace_local_stream();
            return;
            }

            UInt32 local_delay = replicated_storage->getAbsoluteDelay();

            // 如果设置了最大延迟且本地延迟小于最大延迟,本地副本依然有效,选择本地副本
            // 这里的本地延迟时间是每次查询时不断更新的
            // 包含min_unprocessed_insert_time和max_processed_insert_time
            // 这两个值在zk的tables/{num}/replicas/{hostname}/路径下记录的
            // 这里的用意也是为了间接判断本机的负载情况
            if (local_delay < max_allowed_delay)
            {
            emplace_local_stream();
            return;
            }

            // 如果以上逻辑都没有进入,说明本地延迟已经大于设置的最大延迟参数了,会执行以下代码
            ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
            LOG_WARNING(
            &Logger::get("ClusterProxy::SelectStreamFactory"),
            "Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay << "s.)");

            // 如果没有这是fallback,就不能使用本地副本,去尝试获取远程副本
            if (!settings.fallback_to_stale_replicas_for_distributed_queries)
            {
            if (shard_info.hasRemoteConnections())
            {
            emplace_remote_stream();
            return;
            }
            else
            throw Exception(
            "Local replica of shard " + toString(shard_info.shard_num)
            + " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",
            ErrorCodes::ALL_REPLICAS_ARE_STALE);
            }

            // 如果没有远程副本可选,而且设置了fallback,则才会选择本地副本
            if (!shard_info.hasRemoteConnections())
            {
            emplace_local_stream();
            return;
            }

            // 构造lazily_create_stream方法,避免在主线程中进行连接
            auto lazily_create_stream = [
            pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
            main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,
            local_delay]()
            -> BlockInputStreamPtr
            {
            auto current_settings = context.getSettingsRef();
            auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
            current_settings).getSaturated(
            current_settings.max_execution_time);
            std::vector<ConnectionPoolWithFailover::TryResult> try_results;
            try
            {
            // 这里会去连接池中获取远端的entry,entry中包含着请求其他server的必要信息
            // getManyForTableFunction和getManyChecked方法最后都会调用getManyImpl
            // 只不过传入的TryGetEntry不同,在tryGetEntry中会去检查远端server的表的状态
            // 并且检查远端server中分布式表所使用到的表的延迟情况,
            // 以is_up_to_date(bool)表示
            if (table_func_ptr)
            try_results = pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
            else
            try_results = pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table);
            }
            catch (const Exception & ex)
            {
            if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
            LOG_WARNING(
            &Logger::get("ClusterProxy::SelectStreamFactory"),
            "Connections to remote replicas of local shard " << shard_num << " failed, will use stale local replica");
            else
            throw;
            }

            double max_remote_delay = 0.0;
            for (const auto & try_result : try_results)
            {
            if (!try_result.is_up_to_date)
            max_remote_delay = std::max(try_result.staleness, max_remote_delay);
            }

            // 下面是将得到的result进行聚合
            if (try_results.empty() || local_delay < max_remote_delay)
            return createLocalStream(query_ast, context, stage);
            else
            {
            std::vector<IConnectionPool::Entry> connections;
            connections.reserve(try_results.size());
            for (auto & try_result : try_results)
            connections.emplace_back(std::move(try_result.entry));

            return std::make_shared<RemoteBlockInputStream>(
            std::move(connections), query, header, context, nullptr, throttler, external_tables, stage);
            }
            };

            res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream));
            }
            else
            emplace_remote_stream();
            }
                    createForShard主要是决定选择本地还是远程副本的问题,下面继续看下getManyImpl方法

                    getManyImpl方法主要是决定用多少entries以及远程副本的策略,继续看getMany方法
              PoolWithFailoverBase<TNestedPool>::getMany(
              size_t min_entries, size_t max_entries, size_t max_tries,
              const TryGetEntryFunc & try_get_entry,
              const GetPriorityFunc & get_priority,
              bool fallback_to_stale_replicas)
              {
              ......

              std::string fail_messages;
              bool finished = false;
              while (!finished)
              {
              for (size_t i = 0; i < shuffled_pools.size(); ++i)
              {
              if (up_to_date_count >= max_entries
              || entries_count + failed_pools_count >= nested_pools.size())
              {
              finished = true;
              break;
              }

              ShuffledPool & shuffled_pool = shuffled_pools[i];
              TryResult & result = try_results[i];
              if (shuffled_pool.error_count >= max_tries || !result.entry.isNull())
              continue;

              std::string fail_message;
              // 这里就是调用了上面提到的TryGetEntryFunc方法来真正的获取entry
              result = try_get_entry(*shuffled_pool.pool, fail_message);

              if (!fail_message.empty())
              fail_messages += fail_message + '\n';

              if (!result.entry.isNull())
              {
              ++entries_count;
              if (result.is_usable)
              {
              ++usable_count;
              if (result.is_up_to_date)
              ++up_to_date_count;
              }
              }
              else
              {
              LOG_WARNING(log, "Connection failed at try №"
              << (shuffled_pool.error_count + 1) << ", reason: " << fail_message);
              ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);

              shuffled_pool.error_count = std::min(max_error_cap, shuffled_pool.error_count + 1);

              if (shuffled_pool.error_count >= max_tries)
              {
              ++failed_pools_count;
              ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);
              }
              }
              }
              }

              if (usable_count < min_entries)
              throw DB::NetException(
              "All connection tries failed. Log: \n\n" + fail_messages + "\n",
              DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);

              try_results.erase(
              std::remove_if(
              try_results.begin(), try_results.end(),
              [](const TryResult & r) { return r.entry.isNull() || !r.is_usable; }),
              try_results.end());

              // 以下代码主要是对结果进行排序
              std::stable_sort(
              try_results.begin(), try_results.end(),
              [](const TryResult & left, const TryResult & right)
              {
              return std::forward_as_tuple(!left.is_up_to_date, left.staleness)
              < std::forward_as_tuple(!right.is_up_to_date, right.staleness);
              });

              ......

              return try_results;
              }

                      getMany方法就是真正获取entry并进行排序的过程,至此,可以知道ck是如何选择远端server的过程了。如果想要了解Distributed表真正的数据查询,需要继续分析上述提到过的三个匿名方法:emplace_local_stream、emplace_remote_stream、lazily_create_stream。这里不再具体分析,有兴趣自行了解下就好。
                      切记一个问题,分析了这么多,实际上都是在构造各种Stream并添加到Stream列表里,实际执行是通过IBlockInputStream::read()方法驱动流列表中流的流动,进而完成查询。

              近期文章推荐:

                      ClickHouse优化典藏

                      ClickHouse 之 Server Settings

                      ClickHouse那些年我们遇到过的问题

              更多精彩内容欢迎关注公众号

              文章转载自 ClickHouse周边,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论