暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入Hbase原理(超详细)

Hadoop大数据开发 2021-07-05
1458

目录

前言

Hbase概述

Hbase中的核心概念

原理加强之Region拆分

HMaster的作用   主节点

Region的拆分

负载均衡

1.1 自动负载均衡流程

1.2 强制执行负载均衡

1.3 人为的移动

大小合并

大合并之Region合并

小合并之Hfile的合并

Hbase的数据导入

shell端脚本方式

使用步骤

Java程序方式

MR程序读取数据处理 并输出到Hbase中

前言

Hbase概述

我们都知道,Hbase是一个高可靠的、易扩展的、面向列式存储的分布式数据库系统,那既然是一款数据库管理系统,当然,它的数据借助于HDFS存储。具体路径为/hbase/data/default/table_name/region_name/cf_name/hfile

Hbase中的核心概念

region

1)表的行数据范围,将一张大的表划分成多个region,将region分配给不同的regionserver机器管理-------->构成分布式数据库

2)region中有

store:一个列族对应一个store

memorystore  :读写数据的内存  写数据(内存)---->对整个file中的数据进行排序

WALG:记录用户的操作行为   在进行写入数据时,若机器意外宕机时恢复用户数据的一种方式

storefile:内存对象flush到hdfs中形成hfile文件;storefile就是hfile的抽象对象

blockCache:提升查询的效率

其他名词概念

namespace:名称空间,类似于mysql中的数据库名

表   namespace:table_name

列族  columanFamily简称cf:对列的分类管理  创建表时要注意   1)列族不要太多(太多意味着更多的memorystore内存对象,占用更多的内存)  2)命令不要太长(储存的时候数据过长 冗余)

行键 rowkey:1)行的唯一标识   2)索引  3)一维排序  4)布隆过滤器

属性 qualifier  :具有稀疏性

值 value:存储的字节数据

原理加强之Region拆分

HMaster的作用   主节点

1)DDL有关的操作,比如建表需要master主节点  ;而读写数据却不需要master,从hbase读写数据的流程来看,确实没有接触到master

2)region具体分配到哪个regionserver上需要matser

Region的拆分

为什么要拆分region?

HBase是以表的形式存储数据的。一张表被划分为多个regions,regions分布在多个Region Server上,单个region只能分布在一个Region Server节点上,不能跨Region Server存放。

Region的两个重要属性:StartKey和EndKey,分别表示这个region所维护的rowkey范围。当做读/写请求时,寻址到该数据的rowkey落在某个start-end key范围内,那么就会定位到该范围内的region所在的Region Server上进行数据读/写。

随着数据的增多,region所要管理的数据也会越来越多,查询时,出现高并发热点问题的概率大大增加。单个机器需要来处理很多数据,并且当这个regionserver的负载过重

这个时候就需要对region进行拆分,均衡到不同的节点上进行管理!

热点问题

在创建表的时候建议 使用预分region表 (指定了切割点  ,对数据有分组规划)

1. 观察数据特点   注意重点字段(业务)

2 了解具体的表对应的业务,重点字段

3 根据业务 判断数据的读多(查询)  写多(插入)

4 读数据 写数据 (特点)  维度

5 避免热点问题  [插入热点 , 查询热点]

  • 138...大量的数据插入到同一个region 一个regionserver在服务  压力过大

  • 并发的查询138...数据  一个regionserver在服务 接收所有的并发查询  

怎么拆分?

1)自动拆分

1.1默认按照大小

计算公式为:
Min{

1^3*2*128M    256M

2^3*2*128M    2G

3^3*2*128M    6.75G

 10G          10G

当hbase表在regionserver上的region,如果region的大小到达一个阈值,这个region将会分为两个。

如果默认值情况下,一个表在一个regionserver上split的阈值是:
256MB(第一次split),2GB(第二次),6.75GB(第三次),10GB(第四次),10GB... 10GB

1.2keyPrefixRegionSplitPolicy(自定义) key的前缀

这种拆分是在原来的拆分基础上 ,增加了拆分点(splitPoint,拆分点就是Region被拆分时候的rowkey)的定义,保证有相同前缀的rowkey不会被拆分到不同的Region上

参数是 keyPrefixRegionSplitPolicy.prefix_length   rowkey:前缀长度

问题是 : 可能相同主题的数据会被分到不同的regionserver中 ,查询数据来源于不同的regionserver

1.3DelimitedKeyPrefixRegionSplitPolicy key的分隔符

和上一种查分策略一致 , 上一种是按照key的固定长度拆分的 , 这种按照的是分割符

DelimitedKeyPrefixRegionSplitPolicy.delimiter参数分割符

指定一个分隔符 能保证相同的主题数据在一个regionserver中

2)手动拆分(很少用,不需要)

    Examples:
    split 'tableName'
    split 'namespace:tableName'
    split 'regionName' # format: 'tableName,startKey,id'
    split 'tableName', 'splitKey'
    split 'regionName', 'splitKey'

    3)预分region

    shell端:

        hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']
      hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']

      java客户端:

            byte[][] keys = new byte[][]{"e".getBytes(), "m".getBytes(), "t".getBytes()};
        // 建表 指定预分region的key(数组)
        admin.createTable(tableDescriptor, keys);

        一般在实际业务中首先会根据数据量和集群的规模还有业务特点,会进行预分区region,之后按照默认的拆分规则自动拆分即可

        可以设置拆分的策略  1)大小拆分  2)按照前缀拆分 3)按照分隔符拆分  4)手动拆分(几乎很少用

        region在底层是文件夹。

        拆分之后

        region拆分之后,面临的将是负载均衡或(region的移动) 

        region的移动会非常非常占用资源;因为region的移动会牵扯到文件、文件夹的创建、hfile的移动。

        会消耗大量的IO、网络、CPU资源被占用,最好不要手动频繁的去拆分。

        拆分带来的好处:将一个大的表拆分成多个region区域,交由不同的regionserver管理,形成分布式数据库,减少了某一个机器的的负载压力。

        负载均衡

        由Master的LoadBalancer线程周期性的在各个RegionServer间移动region维护负载均衡。

        1 经常被并发查询的数据不要存储在同一个RegionServer中 , 避免热点读取问题 .

        2 当一个机器上经过大量的插入或者删除数据以后 ,region合并或者分裂 ,那么机器上的region的数量会相差很大 .

        3 当新增了节点以后 , 应该去分配一些其他机器上的region数据

        4 当某个RegionServer宕机以后 , 这台机器上数据的分配

        region的执行

        Hbase中负载均衡的开关balanceSwitch默认是开启的 .

        hbase.balancer.period是设定负载均衡的参数,默认为每5分钟(300000毫秒)运行一次,参数配置通过hbase-site.xml文件。具体的设置格式如下:

        1. <property>

        2. <name>hbase.balancer.period</name>

        3. <value>300000</value>

        4. </property>


        1.1 自动负载均衡流程

        1)两个有效参数:MIN = floor(average)(表示下限)和MAX=ceil(average)(表示上限)。

        2)循环过载机器,将Region卸载到MAX数量,在小于等于MAX时停止排序Region(按时间新旧)。

        3)遍历最轻负载机器,分配Region直到Server达到MIN值。在大于等于MIN时停止。这些Region都是之前卸载的。可能没有足够地卸载Region让轻负载的机器达到MIN值,如果这样,在Region数等于neededRegions(轻负载机器的数量)时停止。可能我们分配了卸载的Region到轻负载机器,但是仍然有Region没有分配出去,这种情况下,本步骤完成,在下面步骤中再做处理。

        4)如果neededRegions是非零的值,遍历负载最重的机器,从每台机器上卸载一个Region,使得它们的值从MAX到MIN。

        5)现在有很多Region等待分配,遍历最轻的负载机器(多台),分配Region到MIN。

        6)如果仍然有Region没有分配,遍历最轻的负载机器(多台),这次分配Region到MAX。

        7)所有Server的Region数量时MAX或者MIN,另外,所有大于等于MAX的Server保证在均衡完成后都是MAX个Region。从而保证Region移动的最小数量。

        其中,轻负载指的是Region数量小于等于AVG,过载指大于等于AVG,所有的RegionServer都按照负载从大到小排序,存放在TreeMap中(保证先遍历过载Server)。

        1.2 强制执行负载均衡

        1)kill 掉不是Hmaster节点的 regionserver(slave2)

        2) hbase 16010 查看regionserver在slave2节点

        3) 在master节点启动 regionserver(slave2)

            master $>hbase-daemons.sh start regionserver

        4) hbase 16010 查看regionserver负载不均衡 (多刷新  0  6)

        1.     balance负载均衡指令,需使用开关模式,原因负载均衡比较耗费资源:

        2.     hbase(main):005:0> balance_switch true    //开启负载均衡

        3.     hbase(main):005:0> balancer                      执行负载均衡

        4.     hbase(main):005:0> balance_switch false   关闭负载均衡

        5.     hbase(main):005:0> balancer "force"           //强制负载均衡


        1.3 人为的移动

        当发生经常性的查询热点问题时, 您的请求总是请求到一个服务节点上 ,我们可以把这个机器中的数据理解成热数据 ,可以人为的拆分以后手动的移动到不同的机器上 实现查询热点的负载均衡 

        使用命令

        1. 1 切分

        2. Examples:

        3. split 'tableName'

        4. split 'namespace:tableName'

        5. split 'regionName' # format: 'tableName,startKey,id'

        6. split 'tableName', 'splitKey'

        7. split 'regionName', 'splitKey'

        8. 页面查看region情况 ,或者使用list_regions "tb_name"

        9. 2 移动 将某个region移动到某个机器上

        10. move  "a84afc7eb10797188c0af1534e143eb9" , "linux03,16020,1591844369905"


         

        大小合并

        大合并之Region合并

        为什么需要合并region呢?

        当一个Region被不断的写数据,达到Region的Split的阀值时(由属性hbase.hregion.max.filesize来决定,默认是10GB),该Region就会被Split成2个新的Region。随着业务数据量的不断增加,Region不断的执行Split,那么Region的个数也会越来越多

        一个表的Region过大,势必整个集群的Region个数也会增加,负载均衡后,每个RegionServer承担的Region个数也会增加

        当表中的数据被大量删除以后,表的行数急剧减少,region的个数没有变。region的个数没有变,每个region管理的数据行数少!合并region

        当然合并region也是占用大量的IO、网络、cpu资源的。

        小合并之Hfile的合并

        hbase是分布式列式存储数据库 , 其具有的基本功能就是对数据的增删改查 , 那么Hbase的数据是存储在HDFS上的,我们知道在HDFS中的数据是不允许随机修改和删除的!这与HDFS的功能相违背!

        hbase数据底层存储下HDFS系统中--------Hfile的形式存储的!在HDFS中Hfile的具体位置 hbase/data/default/table_name/region_name/cf_name/hfile_name   如下图所示

        hfile合并

        数据加载到memstore,数据越来越多直到memstore占满,再写入硬盘storefile中,每次写入形成一个单独Hfile,当Hfile达到一定的数量后,就会开始把小Hfile合并成大Hfile,因为Hadoop不擅长处理小文件,文件越大性能越好。

        hfile合并的时机

        1)有大量的更新数据  (修改、删除 操作)

        hfile在进行修改数据或者删除数据时,并不是真的删掉了这条数据,而是重新生成一个hfile,将数据放入其中,并打上标记。比如删除rk001 的数据,会

        新生成file文件,里面存有rk001 并带有墓碑标记的数据   客户端在拿数据时,服务器找到了两条rk001 一看有墓碑标记,就不再将此条数据读出返回给客户端

        2)有大量的小文件

        设计的不好,rowkey多,大量的冗余数据,列族多,store多,memorystore多,占用内存过多,容易溢出,会使机器的内存占用达到阈值,就会flush掉所有的memorystore中的数据,从而产生大量的

        storefile进而产生很多很多的hfile(列族多,内存小)

        3)有TTL过期数据

        数据大量删除------>hfile新生-------->hifle合并-------->region管理数据行数变少-------->region合并

        region的合并不要在业务高峰进行

        面试题:

        hbase表中的数据存储在哪?

        hbase中的数据物理存储在hdfs上。

        hdfs中的数据不是不允许随机修改和删除吗,你刚刚说hbase上的数据存储在hdfs上的,hbase既然是个数据库管理系统,那肯定有crud的功能吧,那这怎么解释呢?

        hdfs上的数据的确是不允许进行随机修改和删除的,而habse中的数据修改和删除也不是直接就修改和删掉了的,是通过数据的标记和文件的合并来实现修改和删除的

        比如,hbase数据的删除,对数据的删除,实际上生成了一个新的hfile,并给数据打上墓碑标记

        查询数据时,看到墓碑标记,就不会读取这条数据并返回

        等到一定条件,hfile合并之后,数据才会消失。

        Hbase的数据导入

        1)put  不好     直接将数据导入到Hbase中 , 底层使用RPC请求 , 每次put都会RPC请求,效率并不高

        一行插入一次  进行一次RPC请求

         * ------------------>hbase
         * ------------------>hbase
         * ------------------>hbase
         * ------------------>hbase

        用rpc请求,一条一条地通过rpc提交给regionserver去插入,效率极其低下!

          Table tb_dml = conn.getTable(TableName.valueOf("tb_dml"));

          Put rk0011 = new Put(Bytes.toBytes("rk0011"));
          rk0011.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes());
          rk0011.addColumn("cf1".getBytes(), "gender".getBytes(), "M".getBytes());

          Put rk0010 = new Put(Bytes.toBytes("rk0010"));
          rk0010.addColumn("cf1".getBytes(), "age".getBytes(), Bytes.toBytes(23));
          rk0010.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes());
          // 插入多行数据
          ArrayList<Put> puts = new ArrayList<>();
          puts.add(rk0011);
          puts.add(rk0010);
          // 插入数据
          tb_dml.put(puts);

          2)BufferedMutator

          使用缓存批次的形式发送RPC请求 , 减少RPC的请求次数 , 插入数据的效率会比put方式效率高, 但是对于大量的静态数据也并不适合!

            private static void mutationInsert() throws IOException {
            Connection conn = HbaseUtils.getHbaseConnection();
            BufferedMutator mutator = conn.getBufferedMutator(TableName.valueOf("tb_user"));
            Put put = new Put(Bytes.toBytes("rk1005")); // 行键
            put.addColumn(Bytes.toBytes("cf1"),// 列族
            Bytes.toBytes("name"),//属性
            Bytes.toBytes("OOO") //值
            );
            put.addColumn("cf1".getBytes(), "gender".getBytes(), "M".getBytes());
            // 插入数据
            mutator.mutate(put);
            // 将缓存在本地的数据 请求Hbase插入
            mutator.flush();
            }

            3) BulkLoad

            数据插入到hbase中的本质:将数据以Hfile文件格式存储在HDFS的指定位置 

            *** 将大量的静态数据转换成hfile文件  直接存储到指定的hdfs路径下  批量导入***

             

            使用工具:bulkloader 一个用于批最快速导入数据到 hbase 的工具/方法

            应用场景用于已经存在一批巨量的静态数据的情况!                   

            简介:使用 Bulk Load方式由于利用了HBase 的数据信息是按照特定格式存储在HDFS里的这一特性,直接在 HDFS中生成持久化的 HFile数据格式文件,然后完成巨量数据快速入库的操作,配合MapReduce完成这样的操作,不占用 Region资源,不会产生巨量的写入IO,所以需要较少的CPU和网络资源。Bulk Load 的实现原理是通过一个MapReduce Job来实现的,通过Job直接生成一个HBase的 HFile格式文件,用来形成一个特殊的 HBase数据表,然后直接将数据文件加载到运行的集群中。与使用Hbase API相比,使用Bulkload导入数据占用更少的CPU和网络资源

            3.1shell端脚本方式

            使用shell脚本,可以完成。该方法操作方便,但不够灵活。

            使用步骤

            静态数据如下:

            uid001,zss,23,F
            uid002,lss,13,M
            uid003,ww,22,M
            uid004,zl,34,F
            uid005,tq,43,M
            uid006,wb,55,F
            uid007,sj,98,M

            1先在linux01上新建txt,并上传到hdfs上

            2在hbase创建一张表   create  ‘test_data’,'cf1'

            3执行命令   根据数据生成hbase文件

            1. #info:列族名 name:属性 imp:表名 tsv/input:输入路径 tsv/output:输出路径

            2. hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dmporttsv.separator=, -Dimporttsv.columns='HBASE_ROW_KEY,cf1:name,cf1:age,cf1:gender' -Dimporttsv.bulk.output=/tsv/output imp tsv/input


            命令解释:  -Dimporttsv.separator=,     指定文件分割符号  为  ,

            Dimporttsv.columns='HBASE_ROW_KEY,cf1:name,cf1:age,cf1:gender   分割的行键    列族:属性

            /tsv/input:输入路径   tsv/output:输出路径

             hbase  org.apache.hadoop.hbase.mapreduce.ImportTsv  -Dimporttsv.separator=, -Dimporttsv.columns='HBASE_ROW_KEY,cf1:name,cf1:age,cf1:gender' -Dimporttsv.bulk.output=/tsv/output  test_data  /data

            4查看页面上文件夹显示

            此时是还没有hfile的

            5将生成的hfile文件导入到hbase表中

            hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles tsv/output test_data

            执行后,查看页面文件

            6解析验证是否导入的静态数据文件

            hbase hfile   -p -f     文件在hdfs中的路径     或者使用scan 'test_data'

            3.2Java程序方式

            使用Mr程序的自定义输入和输出文件类型 

            • 输入Hfile文件     输出普通的文件   表-->普通文件(数据导出)

            • 输入Hfile文件    输出Hfile文件   表-->表

            • 输入普通文件   输出Hflie  将普通数据导入到Hbase中

            *** 将普通的数据 (大量)  ---->   hfile格式的数据 [MR]

            [MR]:自定义输入和输出[hfile]  --> 指定路径 

            编写java程序  mapper读数据  处理  reducer 写数据     灵活,但需要另写代码

              MR程序读取数据处理 并输出到Hbase中
              MovieBean代码:
              package cn._51doit.movie;


              import org.apache.hadoop.io.Writable;


              import java.io.DataInput;
              import java.io.DataOutput;
              import java.io.IOException;


              /**
              */
              public class MovieBean implements Writable {
              private String movie ;
              private double rate ;
              private String timeStamp ;
              private String uid ;


              public String getMovie() {
              return movie;
              }


              public void setMovie(String movie) {
              this.movie = movie;
              }


              public double getRate() {
              return rate;
              }


              public void setRate(double rate) {
              this.rate = rate;
              }


              public String getTimeStamp() {
              return timeStamp;
              }


              public void setTimeStamp(String timeStamp) {
              this.timeStamp = timeStamp;
              }


              public String getUid() {
              return uid;
              }


              public void setUid(String uid) {
              this.uid = uid;
              }


              @Override
              public void write(DataOutput dataOutput) throws IOException {
              dataOutput.writeUTF(movie);
              dataOutput.writeDouble(rate);
              dataOutput.writeUTF(timeStamp);
              dataOutput.writeUTF(uid);
              }


              @Override
              public void readFields(DataInput dataInput) throws IOException {
              this.movie = dataInput.readUTF();
              this.rate = dataInput.readDouble() ;
              this.timeStamp = dataInput.readUTF();
              this.uid = dataInput.readUTF();


              }
              }

              MR程序:

                package cn._51doit.mr;


                import cn._51doit.movie.MovieBean;
                import com.google.gson.Gson;
                import com.google.gson.JsonSyntaxException;
                import org.apache.commons.lang3.StringUtils;
                import org.apache.hadoop.conf.Configuration;
                import org.apache.hadoop.fs.Path;
                import org.apache.hadoop.hbase.HBaseConfiguration;
                import org.apache.hadoop.hbase.client.Put;
                import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
                import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
                import org.apache.hadoop.hbase.mapreduce.TableReducer;
                import org.apache.hadoop.hbase.util.Bytes;
                import org.apache.hadoop.io.LongWritable;
                import org.apache.hadoop.io.Text;
                import org.apache.hadoop.mapreduce.Job;
                import org.apache.hadoop.mapreduce.Mapper;
                import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


                import java.io.IOException;


                /**
                */
                public class LoadData2HbaseTable {


                /**
                * 处理每行数据 生成rowkey和movieBean
                */
                static class LoadData2HbaseTableMapper extends Mapper<LongWritable, Text, Text, MovieBean> {
                Gson gs = new Gson();
                Text k = new Text();
                @Override
                protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                try {
                String line = value.toString();
                MovieBean mb = gs.fromJson(line, MovieBean.class);
                String s = StringUtils.leftPad(mb.getMovie(), 5, '0');
                // 设计rowkey主键
                String rk = s + "_" + mb.getTimeStamp();
                k.set(rk);
                context.write(k, mb);
                } catch (Exception e) {
                e.printStackTrace();
                }


                }


                }


                static class LoadData2HbaseTableReducer extends TableReducer<Text , MovieBean , ImmutableBytesWritable> {


                @Override
                protected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {
                String rk = key.toString();
                Put put = new Put(rk.getBytes()); // 行
                MovieBean mb = values.iterator().next();
                String movie = mb.getMovie();
                double rate = mb.getRate();
                String timeStamp = mb.getTimeStamp();
                String uid = mb.getUid();
                put.addColumn("cf".getBytes() , "movie".getBytes() , movie.getBytes()) ;
                put.addColumn("cf".getBytes() , "rate".getBytes() , Bytes.toBytes(rate)) ;
                put.addColumn("cf".getBytes() , "timeStamp".getBytes() , timeStamp.getBytes()) ;
                put.addColumn("cf".getBytes() , "uid".getBytes() , uid.getBytes()) ;
                context.write(null , put);
                }




                }


                public static void main(String[] args) throws Exception {
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum","linux01:2181,linux02:2181,linux03:2181");
                Job job = Job.getInstance(conf, "tohbase");


                job.setMapperClass(LoadData2HbaseTableMapper.class);


                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(MovieBean.class);


                FileInputFormat.setInputPaths(job , new Path("D:\\data\\movie\\input"));
                TableMapReduceUtil.initTableReducerJob("movie2",LoadData2HbaseTableReducer.class,job);


                job.waitForCompletion(true) ;




                }


                }

                要点整理:

                rowkey的设计

                rowkey决定了你的查询效率

                1)观察数据结构 数据特点  了解核心字段

                2)了解业务需求 :知道数据的作用,经常查询   经常插入数据

                3)经常查询   ---------》确定查询纬度    

                          1    movie

                           2    uid

                           3   movie+uid

                4)注意点:

                1 将查询纬度放在rk上;

                2 保证rowkey的唯一性;

                3 rowkey可以长度相同,方便排序

                4 划分属性含义  用特殊符号 _

                5 rowkey不要太长,容易造成大量冗余数据

                5)要考虑并发热点问题

                文章转载自Hadoop大数据开发,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论