暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink写出数据到HBase的Sink实现

暴走大数据 2020-08-10
4032


点击上方蓝色字体,选择“设为星标”

回复”资源“获取更多惊喜

大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!

暴走大数据
点击右侧关注,暴走大数据!

文章目录

    • 一、MyHbaseSink

      • 1、继承RichSinkFunction<输入的数据类型>类

      • 2、实现open方法,创建连接对象

      • 3、实现invoke方法,批次写入数据到Hbase

      • 4、实现close方法,关闭连接

    • 二、HBaseUtil工具类

一、MyHbaseSink

1、继承RichSinkFunction<输入的数据类型>类

public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {    private transient Integer maxSize = 1000;    private transient Long delayTime = 5000L;    public MyHbaseSink() {    }    public MyHbaseSink(Integer maxSize, Long delayTime) {        this.maxSize = maxSize;        this.delayTime = delayTime;    }    private transient Connection connection;    private transient Long lastInvokeTime;    private transient List<Put> puts = new ArrayList<>(maxSize);

2、实现open方法,创建连接对象

  // 创建连接    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        // 获取全局配置文件,并转为ParameterTool        ParameterTool params =                (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();        //创建一个Hbase的连接        connection = HBaseUtil.getConnection(                params.getRequired("hbase.zookeeper.quorum"),                params.getInt("hbase.zookeeper.property.clientPort", 2181)        );        // 获取系统当前时间        lastInvokeTime = System.currentTimeMillis();    }

3、实现invoke方法,批次写入数据到Hbase

   @Override    public void invoke(Tuple2<String, Double> value, Context context) throws Exception {        String rk = value.f0;        //创建put对象,并赋rk值        Put put = new Put(rk.getBytes());        // 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25        put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());        puts.add(put);// 添加put对象到list集合        //使用ProcessingTime        long currentTime = System.currentTimeMillis();        //开始批次提交数据        if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {            //获取一个Hbase表            Table table = connection.getTable(TableName.valueOf("database:table"));            table.put(puts);//批次提交            puts.clear();            lastInvokeTime = currentTime;            table.close();        }    }

4、实现close方法,关闭连接

   @Override    public void close() throws Exception {        connection.close();    }


二、HBaseUtil工具类

  • Hbase的工具类,用来创建Hbase的Connection

public class HBaseUtil {
/**
* @param zkQuorum zookeeper地址,多个要用逗号分隔
* @param port zookeeper端口号
* @return connection
*/

public static Connection getConnection(String zkQuorum, int port) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkQuorum);
conf.set("hbase.zookeeper.property.clientPort", port + "");

Connection connection = ConnectionFactory.createConnection(conf);
return connection;
}
}

欢迎点赞+收藏
欢迎转发至朋友圈


文章转载自暴走大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论