目录
前言
Hbase概述
Hbase中的核心概念
原理加强之Region拆分
HMaster的作用 主节点
Region的拆分
负载均衡
1.1 自动负载均衡流程
1.2 强制执行负载均衡
1.3 人为的移动
大小合并
大合并之Region合并
小合并之Hfile的合并
Hbase的数据导入
shell端脚本方式
使用步骤
Java程序方式
MR程序读取数据处理 并输出到Hbase中
前言
Hbase概述
我们都知道,Hbase是一个高可靠的、易扩展的、面向列式存储的分布式数据库系统,那既然是一款数据库管理系统,当然,它的数据借助于HDFS存储。具体路径为/hbase/data/default/table_name/region_name/cf_name/hfile
Hbase中的核心概念
region
1)表的行数据范围,将一张大的表划分成多个region,将region分配给不同的regionserver机器管理-------->构成分布式数据库
2)region中有
store:一个列族对应一个store
memorystore :读写数据的内存 写数据(内存)---->对整个file中的数据进行排序
WALG:记录用户的操作行为 在进行写入数据时,若机器意外宕机时恢复用户数据的一种方式
storefile:内存对象flush到hdfs中形成hfile文件;storefile就是hfile的抽象对象
blockCache:提升查询的效率
其他名词概念
namespace:名称空间,类似于mysql中的数据库名
表 namespace:table_name
列族 columanFamily简称cf:对列的分类管理 创建表时要注意 1)列族不要太多(太多意味着更多的memorystore内存对象,占用更多的内存) 2)命令不要太长(储存的时候数据过长 冗余)
行键 rowkey:1)行的唯一标识 2)索引 3)一维排序 4)布隆过滤器
属性 qualifier :具有稀疏性
值 value:存储的字节数据
原理加强之Region拆分
HMaster的作用 主节点
1)DDL有关的操作,比如建表需要master主节点 ;而读写数据却不需要master,从hbase读写数据的流程来看,确实没有接触到master
2)region具体分配到哪个regionserver上需要matser
Region的拆分
为什么要拆分region?
HBase是以表的形式存储数据的。一张表被划分为多个regions,regions分布在多个Region Server上,单个region只能分布在一个Region Server节点上,不能跨Region Server存放。
Region的两个重要属性:StartKey和EndKey,分别表示这个region所维护的rowkey范围。当做读/写请求时,寻址到该数据的rowkey落在某个start-end key范围内,那么就会定位到该范围内的region所在的Region Server上进行数据读/写。
随着数据的增多,region所要管理的数据也会越来越多,查询时,出现高并发热点问题的概率大大增加。单个机器需要来处理很多数据,并且当这个regionserver的负载过重。
这个时候就需要对region进行拆分,均衡到不同的节点上进行管理!
热点问题
在创建表的时候建议 使用预分region表 (指定了切割点 ,对数据有分组规划)
1. 观察数据特点 注意重点字段(业务)
2 了解具体的表对应的业务,重点字段
3 根据业务 判断数据的读多(查询) 写多(插入)
4 读数据 写数据 (特点) 维度
5 避免热点问题 [插入热点 , 查询热点]
138...大量的数据插入到同一个region 一个regionserver在服务 压力过大
并发的查询138...数据 一个regionserver在服务 接收所有的并发查询
怎么拆分?
1)自动拆分
1.1默认按照大小
计算公式为:
Min{1^3*2*128M 256M
2^3*2*128M 2G
3^3*2*128M 6.75G
10G 10G
当hbase表在regionserver上的region,如果region的大小到达一个阈值,这个region将会分为两个。
如果默认值情况下,一个表在一个regionserver上split的阈值是:
256MB(第一次split),2GB(第二次),6.75GB(第三次),10GB(第四次),10GB... 10GB1.2keyPrefixRegionSplitPolicy(自定义) key的前缀
这种拆分是在原来的拆分基础上 ,增加了拆分点(splitPoint,拆分点就是Region被拆分时候的rowkey)的定义,保证有相同前缀的rowkey不会被拆分到不同的Region上
参数是 keyPrefixRegionSplitPolicy.prefix_length rowkey:前缀长度
问题是 : 可能相同主题的数据会被分到不同的regionserver中 ,查询数据来源于不同的regionserver
1.3DelimitedKeyPrefixRegionSplitPolicy key的分隔符
和上一种查分策略一致 , 上一种是按照key的固定长度拆分的 , 这种按照的是分割符
DelimitedKeyPrefixRegionSplitPolicy.delimiter参数分割符
指定一个分隔符 能保证相同的主题数据在一个regionserver中
2)手动拆分(很少用,不需要)
Examples:split 'tableName'split 'namespace:tableName'split 'regionName' # format: 'tableName,startKey,id'split 'tableName', 'splitKey'split 'regionName', 'splitKey'
3)预分region
shell端:
hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']
java客户端:
byte[][] keys = new byte[][]{"e".getBytes(), "m".getBytes(), "t".getBytes()};// 建表 指定预分region的key(数组)admin.createTable(tableDescriptor, keys);
一般在实际业务中,首先会根据数据量和集群的规模还有业务特点,会进行预分区region,之后按照默认的拆分规则自动拆分即可。
可以设置拆分的策略 1)大小拆分 2)按照前缀拆分 3)按照分隔符拆分 4)手动拆分(几乎很少用)
region在底层是文件夹。
负载均衡
1.1 自动负载均衡流程
1.2 强制执行负载均衡
1.3 人为的移动
大小合并
大合并之Region合并
小合并之Hfile的合并
Hbase的数据导入
Table tb_dml = conn.getTable(TableName.valueOf("tb_dml"));Put rk0011 = new Put(Bytes.toBytes("rk0011"));rk0011.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes());rk0011.addColumn("cf1".getBytes(), "gender".getBytes(), "M".getBytes());Put rk0010 = new Put(Bytes.toBytes("rk0010"));rk0010.addColumn("cf1".getBytes(), "age".getBytes(), Bytes.toBytes(23));rk0010.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes());// 插入多行数据ArrayList<Put> puts = new ArrayList<>();puts.add(rk0011);puts.add(rk0010);// 插入数据tb_dml.put(puts);
private static void mutationInsert() throws IOException {Connection conn = HbaseUtils.getHbaseConnection();BufferedMutator mutator = conn.getBufferedMutator(TableName.valueOf("tb_user"));Put put = new Put(Bytes.toBytes("rk1005")); // 行键put.addColumn(Bytes.toBytes("cf1"),// 列族Bytes.toBytes("name"),//属性Bytes.toBytes("OOO") //值);put.addColumn("cf1".getBytes(), "gender".getBytes(), "M".getBytes());// 插入数据mutator.mutate(put);// 将缓存在本地的数据 请求Hbase插入mutator.flush();}
3.1shell端脚本方式
使用步骤
3.2Java程序方式
输入Hfile文件 输出普通的文件 表-->普通文件(数据导出)
输入Hfile文件 输出Hfile文件 表-->表
输入普通文件 输出Hflie 将普通数据导入到Hbase中
MR程序读取数据处理 并输出到Hbase中MovieBean代码:package cn._51doit.movie;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/***/public class MovieBean implements Writable {private String movie ;private double rate ;private String timeStamp ;private String uid ;public String getMovie() {return movie;}public void setMovie(String movie) {this.movie = movie;}public double getRate() {return rate;}public void setRate(double rate) {this.rate = rate;}public String getTimeStamp() {return timeStamp;}public void setTimeStamp(String timeStamp) {this.timeStamp = timeStamp;}public String getUid() {return uid;}public void setUid(String uid) {this.uid = uid;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(movie);dataOutput.writeDouble(rate);dataOutput.writeUTF(timeStamp);dataOutput.writeUTF(uid);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.movie = dataInput.readUTF();this.rate = dataInput.readDouble() ;this.timeStamp = dataInput.readUTF();this.uid = dataInput.readUTF();}}
MR程序:
package cn._51doit.mr;import cn._51doit.movie.MovieBean;import com.google.gson.Gson;import com.google.gson.JsonSyntaxException;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/***/public class LoadData2HbaseTable {/*** 处理每行数据 生成rowkey和movieBean*/static class LoadData2HbaseTableMapper extends Mapper<LongWritable, Text, Text, MovieBean> {Gson gs = new Gson();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {try {String line = value.toString();MovieBean mb = gs.fromJson(line, MovieBean.class);String s = StringUtils.leftPad(mb.getMovie(), 5, '0');// 设计rowkey主键String rk = s + "_" + mb.getTimeStamp();k.set(rk);context.write(k, mb);} catch (Exception e) {e.printStackTrace();}}}static class LoadData2HbaseTableReducer extends TableReducer<Text , MovieBean , ImmutableBytesWritable> {@Overrideprotected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {String rk = key.toString();Put put = new Put(rk.getBytes()); // 行MovieBean mb = values.iterator().next();String movie = mb.getMovie();double rate = mb.getRate();String timeStamp = mb.getTimeStamp();String uid = mb.getUid();put.addColumn("cf".getBytes() , "movie".getBytes() , movie.getBytes()) ;put.addColumn("cf".getBytes() , "rate".getBytes() , Bytes.toBytes(rate)) ;put.addColumn("cf".getBytes() , "timeStamp".getBytes() , timeStamp.getBytes()) ;put.addColumn("cf".getBytes() , "uid".getBytes() , uid.getBytes()) ;context.write(null , put);}}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum","linux01:2181,linux02:2181,linux03:2181");Job job = Job.getInstance(conf, "tohbase");job.setMapperClass(LoadData2HbaseTableMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(MovieBean.class);FileInputFormat.setInputPaths(job , new Path("D:\\data\\movie\\input"));TableMapReduceUtil.initTableReducerJob("movie2",LoadData2HbaseTableReducer.class,job);job.waitForCompletion(true) ;}}
















