暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Druid实时数据处理机制及应用

兴盛优选技术社区 2022-07-05
1523


1.概述


Apache Druid诞生于一家广告数据分析平台公司,是一个支持海量数据实时分析的分布式数据库系统,适用于大规模实时数据导入,快速查询分析的OLAP场景,包括点击流分析,网络服务器性能监控,应用性能指标分析,数字营销分析,实时交易分析等。Druid具有时序数据库的一些显著特征,支持低延时数据摄取,按照一定的时间颗粒度对数据进行预聚合,能对历史和实时数据提供亚秒级查询,实现高效数据探索分析。

本文先从类LSM-tree索引结构,DataSource与Segment数据存储,内存化数据查询几个方面深入理解Druid架构设计思想与实时数据处理机制,然后结合实际业务场景阐述Druid时序数据处理方法,以及实践过程中遇到的问题与解决方案。

2.Druid架构设计解析


为实现大规模数据集实时数据处理与即席查询,通常需要在数据的高效摄入与快速查询之间做取舍与权衡。传统关系型数据库如果想在查询时有更快的响应速度,通常会牺牲一些数据写入的性能以完成索引的创建;反之,如果想获得更快的写入速度,往往要放弃一些索引的创建而导致在查询的时候付出较高的性能代价。Druid通过其独到的架构设计,基于DataSource与Segment的数据结构,以及一些巧妙的系统实现细节,既达到了高效的数据实时摄入又能提供性能卓越的快速查询。

图2-1 Druid总体架构

2.1 架构概览

Druid使用列存数据存储方式,采用Lambda架构分别处理实时数据和批量数据。实时数据处理部分面向写多读少的优化,批处理部分面向读多写少的优化。如图2-1所示,Druid总体架构主要包含以下节点:

◇实时节点(Realtime Node):即时摄入实时数据,以及生成Segment数据文件。

◇历史节点(Historical Node):加载已生成好的数据文件,以供数据查询。

◇代理节点(Broker Node):对外提供数据查询服务,并同时从实时节点与历史节点查询数据,合并后返回给调用方。

◇协调节点(Coordinator Node):负责历史节点的数据负载均衡,以及通过规则配置管理数据生命周期。同时,Druid集群还包含以下外部依赖:

◇元数据库(Metastore):存储Druid集群的原数据信息,比如Segment的相关信息,一般用MySQL或PostgreSQL。

◇分布式协调服务(Coordination):为Druid集群提供一致性协调服务的组件,通常为Zookeeper。

◇数据文件存储(DeepStorage):存储Segment数据文件,并供历史节点下载。对于单节点集群可以是本地磁盘,而对于分布式集群一般是HDFS。从数据流转的角度来看,实时流数据会被实时节点消费,然后实时节点将生成的Segment数据文件上传到数据文件存储库;而批量数据经过Druid集群消费后会被直接上传到数据文件存储库。同时,查询节点会响应外部的查询请求,并将分别从实时节点与历史节点查询到的结果合并后返回。

2.2 类LSM-tree索引结构

数据库的数据大多存储在磁盘上,而不论是机械硬盘还是固态硬盘,对磁盘数据 的顺序读写速度都远高于随机读写。传统关系型数据库广泛使用B+树(B+-tree)及其衍生树索引结构,需要较多的磁盘随机读写,并且随着插入的数据不断增多,B+树叶子节点会慢慢分裂,可能导致逻辑上原本连续的数据实际上存放在不同的物理磁盘块位置上,在做范围查询的时候会产生较高的磁盘I/O,严重影响性能。2008年Google关于Bigtable[3]的论文中引入了日志结构合并树LSM-tree(Log-Structured Merge-Tree)技术,其主要思想是将磁盘看作一个大的日志,每次都将新的数据及其索引结构添加到日志的最末端,以实现对磁盘的顺序操作,并通过将数据文件预排序的方式克服日志结构方法随机读性能较差的问题,从而提高索引性能。LSM-tree最大的特点是同时使用了两部分类树的数据结构来存储数据,并同时提供查询。其中一部分数据结构(C0树)存在于内存缓存(通常叫作memtable)中,负责接受新的数据插入更新以及读请求,并直接在内存中对数据进行排序;另一部分数据结构(C1树)存在于硬盘上(这部分通常叫作sstable),它们是由存在于内存缓存中的C0树冲写到磁盘而成的,主要负责提供读操作,特点是有序且不可被更改。LSM-tree的另一大特点是除了使用两部分类树的数据结构外,还会使用日志文件(称为commitlog)来为数据恢复做保障。这三类数据结构的协作顺序一般是:所有的新插入与更新操作都首先被记录到commit log中——该操作叫作WAL(Write Ahead Log),然后再写到memtable,最后当达到一定条件时数据会从memtable冲写到sstable,并抛弃相关的log数据;memtable与sstable可同时提供查询;当memtable出问题时,可从commit log与sstable中将memtable的数据恢复。LSM-tree的这种结构非常有利于数据的快速写入(理论上可以接近磁盘顺序写速度),但是不利于读,因为理论上读的时候可能需要同时从memtable和所有硬盘上的sstable中查询数据,这样显然会对性能造成较大的影响。

LSM-tree比较适合那些数据插入操作远多于数据更新删除操作与读操作的场景,Druid主要是为时序数据场景设计的,而该场景正好符合LSM-tree的优势特点,因此Druid索引架构吸取了LSM-tree的思想。Druid类LSM-tree架构中的实时节点(Realtime Node)负责消费实时数据,与经典LSM-tree架构不同的是,Druid不提供日志及实行WAL原则,实时数据首先会被直接加载进实时节点内存中的堆结构缓存区(相当于memtable),当条件满足时,缓存区里的数据会被冲写到硬盘上形成一个数据块(Segment Split),同时实时节点又会立即将新生成的数据块加载到内存中的非堆区,因此无论是堆结构缓存区还是非堆区里的数据,都能够被查询节点(Broker Node)查询。同时,实时节点会周期性地将磁盘上同一个时间段内生成的所有数据块合并为一个大的数据块(Segment)。这个过程在实时节点中称为Segment Merge操作,相当于LSM-tree架构中的数据合并操作(Compaction)。合并好的Segment会立即被实时节点上传到数据文件存储库(DeepStorage)中,随后历史节点通过协调节点从数据文件存储库将新生成的 Segment下载到其本地磁盘中,并通过分布式协调服务(Coordination)在集群中声明其从此刻开始负责提供该Segment的查询,当实时节点收到该声明后也会立即向集群声明其不再提供该Segment的查询,接下来查询节点会转从该历史节点查询此Segment的数据。而对于全局数据来说,查询节点会同时从实时节点(少量当前数据)与历史节点(大量历史数据)分别查询,然后做一个结果的整合,最后再返回给用户。

Druid的上述架构特点为其带来了如下显著的优势:

◇类LSM-tree架构使得Druid能够保证数据的高速写入,并且能够提供比较快速的实时查询,这十分符合许多时序数据的应用场景。

◇由于Druid在设计之初就不提供对已有数据的更改,以及不实现传统LSM-tree 架构中普遍应用的WAL原则,虽然这样导致了Druid不适应于某些需要数据更新的场景,也降低了数据完整性的保障,但Druid相对其它传统的LSM-tree架构实现来说减少了不少数据处理的工作量,因此在性能方面更胜一筹。

◇Druid对命令查询职责分离模式的借鉴也使得其组件职责分明、结构更加清晰 明了,方便针对不同模块进行针对性的优化。

2.3 DataSource与Segment数据结构

Druid在数据摄入之前,首先需要定义一个数据源(DataSource),DataSource可以理解为RDBMS中的表(Table)。如图2-2所示,DataSource的结构包含以下几个方面:

◇时间列(TimeStamp):表明每行数据的时间值,默认使用UTC时间格式且精确到毫秒级别。每个数据集合都必须有时间列,这个列是数据聚合的重要维度,Druid会将时间很近的一些数据行聚合在一起。另外,所有的查询都需要指定查询时间范围。

◇维度列(Dimension):维度用来标识数据行的各个类别信息。这些标识主要 用于过滤或者切片数据,维度列字段为字符串类型。

◇指标列(Metric):指标是用于聚合和计算的列。指标列字段通常为数值类型,计算操作通常包括Count、Sum和Mean等。

图2-2 DataSource示例

无论是实时数据消费还是批量数据处理,Druid在基于DataSource结构存储数据时可选择对任意的指标列进行聚合(Roll Up)操作。该聚合操作主要基于维度列与时间范围两方面的情况。

◇同维度列的值做聚合:所有维度列的值都相同时,这一类行数据符合聚合操 作。

◇对指定时间粒度内的值做聚合:在queryGranularity指定的范围,对时间列值为1分钟内的所有行,聚合操作相当于对数据表所有列做了Group By操作。图2-3所示Druid在数据存储时便可对数据进行聚合操作,该特点使得Druid不仅能够节省存储空间,而且能够提高聚合查询的效率。

DataSource是一个逻辑概念,而Segment是数据的实际物理存储格式,Druid正是通过Segment实现了对数据的横纵向切割(Slice and Dice)操作。从数据按时间分布的角度来看,通过参数segmentGranularity的设置,Druid将不同时间范围内的数据存储在不同的Segment数据块中,即所谓的数据横向切割。这种设计的好处在于:按时间范围查询数据时,仅需要访问对应时间段内的这些Segment数据块,而不需要进行全表数据范围查询,这使效率得到了极大的提高。同时,在Segment中也面向列进行数据压缩存储,即所谓的数据纵向切割。而且在Segment中使用了Bitmap等技术对数据的访问进行了优化,在生成索引文件的时候,对每个列的每个取值生成对应的Bitmap集合。areaId为101对应的Bitmap为“1001”,代表第1行和第4行的areaId为“101”。举一个查询的例子,假设要筛选 areaId=’101’andsubOrderId=’21122200210106625374899380’的数据,那么只需要把areaId=’101’对应的Bitmap“1001”和subOrderId对应的Bitmap“0101“进行一个and/&&操作,得到结果为“0100”,代表第1行满足筛选条件。通过Bitmap可以快速定位要读取的数据,加速查询速度。

图2-3 Segment 聚合示例

2.4 内存化数据查询

Druid历史节点主要是为了提供数据查询服务,历史节点在启动的时候,首先会检查本地缓存中已存在的Segment数据文件,然后从DeepStorage中下载目前不在本地磁盘上的Segment数据文件。无论是何种查询,历史节点都会首先将相关Segment数据文件从磁盘加载到内存,然后再提供查询服务。历史节点的查询效率受内存空间的影响很大:内存空间越大,查询时需要从磁盘加载数据的次数就越少,查询速度就越快;反之,查询时需要从磁盘加载数据的次数就多,查询速度就相对较慢。历史节点具有较佳的可扩展性与高可用性。新的历史节点被添加后,会通过Zookeeper被协调节点(Coordinator Node)发现,然后协调节点自动分配相关的Segment给它;原有的历史节点被移出集群后,同样会被协调节点发现,协调节点会将原本分配给它的Segment重新分配给其它处于工作状态的历史节点。

Druid使用Cache机制来提高查询效率。Druid提供了两类介质作为Cache以供择:

◇外部Cache,比如Memcached。

◇本地Cache,比如(查询)代理节点或历史节点的内存作为Cache。如果用代理节点的内存作为Cache,查询的时候会首先访问其Cache,只有当不命中的时候才会去访问历史节点与实时节点以查询数据。

3.Druid应用实践


兴盛优选早期在OLAP场景下涉及实时订单数据处理部分使用了Druid,本章节主要介绍相关应用细节,以及实践过程中遇到的问题与解决方案。

3.1 Druid应用架构简介

如图3-1所示,一个典型的Druid应用架构包含以下数据处理节点:

3-1 Druid应用架构示例

1)业务生产系统以Mysql为主,MySQL二进制日志binlog记录了所有的DDL和DML语句(除了数据查询语句select),且以严格的时间事件形式记录,保障事务完整和准确。通过canal工具解析MySQLbinlog,获取每一笔订单的实时PAID状态,将支付订单中相关字段如:订单号,金额,份数,门提,下单用户等指标信息,写入kafka集群。

2)Kafka-index-service是Druid自身携带的扩展插件,能够将kafka中流式数 据,通过Druid组件不断的摄取,使用时需要在common.runtime.properties 文件中的属性druid.extensions.loadLis添加druid-kafka-indexing-service。


创建Druid拉取kafka实时数据流的json代码文件,通过curl方式提交命令:curl-XPOST-H'Content-Type:application/json'-d @stream_data.jsonhttp://localhost:8090/druid/indexer/v1/supervisor通过Overlord节点页面控制台观察对应的数据摄取任务:


3)Druid在segment文件中存储数据指标,segment文件用时间参数划分分区。在基础设置中,会为每个时间间隔建立一个segment文件,该时间间隔能够在granularitySpec的segmentGranularity参数中配置。同时观察segment的文件大小,如果超过一定量应该改变时间间隔或者分区数据,而且对 partitionsSpec中的targetPartitionSize参数作出调整(推荐优先设置这个参数为500万行)。


4)Druid基于ApacheCalcite实现SQL查询,即通过Broker节点进行连接,用 pydruid或者JAVA,配置jdbc连接,同时也可以配置localcurl方式进行实时 SQL指标计算,将对应的结果集实时写入redis,供前端界面展示,查询实时数据。


3.2 Druid应用问题及解决方案

初期在部署应用Druid过程中,淌过不少应用坑,时常有意想不到的事件发生,给开发和运维带来了不小的挑战。随着应用的逐步深入,集群的稳定性和资源的使用率,逐步得到稳定的提升,同时实时数据处理与查询性能达到良好的效果。以下简述应用过程中踩的部分坑,以及对应的解决方案或优化处理。

1)任务参数不合理导致小Segment过多

通过Coordinator控制台可以查看到对应的data_source整体segment文件个数和文件大小。其中我们经常发现,经过一天的kafka流式摄取,会生成N个segment文件,但以小文件为主,在计算查询时,小文件过多,会影响实时查询的整体性能。


解决方案:

让Druid在繁重的查询压力下保持良好的操作性能,且应该让segment文件的大小在300mb至700mb之间。

i.调整任务json文件参数partitionsSpec中的targetPartitionSize等,合理生成任务数和文件大小。

ii.在凌晨或者数据流不大的情况下,启动compact任务,对文件进行合并操作,合并操作的参数,满足segment最优的原则。


2)不支持精准去重

Druid在设计之初,为了追求亚秒级的OLAP查询分析,对精准去重计算exact distinctcount不支持。如遇到准确去重计算时,会满足不了业务需求。目前druid只提供近似查询,牺牲部分数据的准确性,如 HyperUniques/Sketch:规则方式摄入时做计算,做近似去重查询。

解决方案:

i.在实时要求程度不高的情况下,可以通过时间换空间的方式,对需要计算的字 段做groupby查询,然后count(1)实现精准去重。

ii.对要做精准去重的字段,在摄入时将字段类型改成"type":"thetaSketch",在计算时通过调整SIZE大小,可以实现精准去重:


3)近似计算出现浮点数

在hyperUniqueCardinality近似计算时,常常会输出浮点数,导致where条件过滤计算时,无法使用equalTo进行计算。

解决方案:

此类计算问题通过在postAggregations中,增加一个JavaScript postaggregator计算过程,再利用Math.round进行四舍五入即可。

4)数据摄入Boolean类型字段值为NULL

字段类型为boolean类型,且做为维度层处理,最后摄取入Druid结果后变成了 NULL

解决方案:

Druid本身是无法将true/false之类的boolean类型作为维度的,可以考虑将"true"/"false"字符串作为维度存入。但是,如果自定义的Bean对象中,有 StringisTimeout="false"的属性存在,就不能直接使用JSON.toJSONString进行转换。因为toJSONString方法中会识别出"true"/"false"字符串,并将其自动转化为boolean类型。因此,需要通过Map<String,Object>将所有字段都存入,然后再调用JSON.toJSONString方法即可。

5)SegmentGranularity和QueryGranularit的配置问题

QueryGranularity是查询的最小时间粒度,它主要影响数据摄入过程和查询过程中的Roll-up,QueryGranularity越大,Roll-up以后的数据量越小,查询越快,但是同时也意味着不能查询更细时间粒度的数据。SegmentGranularity是Segment分段的时间粒度,比如SegmentGranularity为hour,那么Segment是按照小时分段。通常我们可以设置SegmentGranularity>=QueryGranularity,但这并不是必须遵守的。例如有一个DataSource的QueryGranularity为day,采用实时摄入的方式,如果遵守上述原则,SegmentGranularity至少是day,那么一天之内Segment都不能移交给历史节点,导致查询性能下降。

6)内存增量索引持久化阈值设置问题

Druid运行中的数据首先在内存增量索引中累积,此过程中会进行Rollup。当达 到一定阈值以后,采用异步线程将内存增量索引持久化到磁盘中,持久化后的索引采用倒排和Bitmap索引,适合查询,当达到SegmentGranularity设定的条件时,将所有的持久化索引合并成一个Segment并移交。决定持久化阈值的是intermediatePersistPeriod和maxRowsInMemory。intermediatePersistPeriod的含义是间隔多长时间进行持久化,maxRowsInMemory是内存增量索引中事件条数达到多少以后进行持久化,两个参数满足其一就会触发持久化。如果将参数设置过小,将会过多地进行持久化,将会影响查询性能;如果将参数设置过大,则会使用过多的内存,影响垃圾回收。实际操作中通过系统运行指标监控调整参数设置。

4.总结


Druid在架构设计上借鉴了LSM-tree及读写分离的思想,并且基于DataSource与Segment数据文件结构与组织方式,通过数据的预聚合,优化存储结构和内存使用,即提供了实时和离线批量数据摄入的能力,又能够在大规模数据集上实现高效实时数据消费与探索,在时序数据应用场景中满足了OLAP的核心功能。但Druid是一套比较复杂的分布式系统,为实现其完整的集群功能,不仅需要要熟悉Broker,Historical,MiddleManager,Coordinator等集群节点进程功能,优化各节点机器配置及对应的JVM与线程池配置,还要面对其外部依赖DeepStorage(HDFS),ZooKeeper,MetadataStorage,可能出现直接或间接影响Druid有效运行的各种问题,因而整套系统有较高的运维成本。这些都是我们在做技术选型时需要考虑的因素。

参考文献:
[1]https://druid.apache.org/docs/latest/design/architecture.html
[2]http://static.druid.io/docs/druid.pdf
[3]F.Chang,J.Dean,S.Ghemawat,W.C.Hsieh,D.A.Wallach,M.Burrows,T.Chandra,A.Fikes,andR.E.Gruber.Bigtable:A distributed storage system for structured data.ACM Transactions on Computer Systems (TOCS),26(2):4,2008.
文章转载自兴盛优选技术社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论