点击上方蓝色字体,选择“设为星标”
回复”资源“获取更多惊喜

文章目录
一、MyHbaseSink
1、继承RichSinkFunction<输入的数据类型>类
2、实现open方法,创建连接对象
3、实现invoke方法,批次写入数据到Hbase
4、实现close方法,关闭连接
二、HBaseUtil工具类
一、MyHbaseSink
1、继承RichSinkFunction<输入的数据类型>类
2、实现open方法,创建连接对象
// 创建连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取全局配置文件,并转为ParameterTool
ParameterTool params =
(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//创建一个Hbase的连接
connection = HBaseUtil.getConnection(
params.getRequired("hbase.zookeeper.quorum"),
params.getInt("hbase.zookeeper.property.clientPort", 2181)
);
// 获取系统当前时间
lastInvokeTime = System.currentTimeMillis();
}
3、实现invoke方法,批次写入数据到Hbase
@Override
public void invoke(Tuple2<String, Double> value, Context context) throws Exception {
String rk = value.f0;
//创建put对象,并赋rk值
Put put = new Put(rk.getBytes());
// 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());
puts.add(put);// 添加put对象到list集合
//使用ProcessingTime
long currentTime = System.currentTimeMillis();
//开始批次提交数据
if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {
//获取一个Hbase表
Table table = connection.getTable(TableName.valueOf("database:table"));
table.put(puts);//批次提交
puts.clear();
lastInvokeTime = currentTime;
table.close();
}
}
4、实现close方法,关闭连接
@Override public void close() throws Exception { connection.close(); }
二、HBaseUtil工具类
Hbase的工具类,用来创建Hbase的Connection

文章转载自暴走大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。








