暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何用Go语言快速方便操作HBase2.0.x —— 基于github.com/pingcap/go-hbase的Hack实践

SunKangShanghai 2022-06-14
2319


封面题图:安提基特拉(Antikythera)机械复原想象图,该机械是为了计算天体在天空中的位置而设计的古希腊青铜机器,属于模拟计算机【百度百科】。该机器于1900年在希腊安提基特拉岛附近的安提基特拉沉船里被发现。制造年代约在西元前150年到100年之间。现藏于希腊国家考古博物馆。引用该图是感叹于远古时代人类技术成就,技术是人类超越自身局限性的工具,我们要正视人类文明的脆弱性与对技术的依赖性,在利用技术同时,也要避免匍匐在技术力量面前,完全成为其附庸。


注:本文同步以“古翠码翁” ID发布于CSDN Web3社区,转载请注明出处。


一、引言

     最近在项目实践中,涉及到使用Go语言操作HBase数据库,本文是相关的经验总结。

      用程序操作HBase数据有多种方法,包括HBase提供的RPC接口,Apache Thrift工具或者Apache Phoenix工具等。其中用原生RPC接口方式简单直接,不需要部署额外第三方组件,直接引用相应程序开发包即可。

      Go语言具有丰富的生态环境,除了标准库,还有各种开源开发包,安装方便,如果开发包不满足业务需求还可以直接修改源代码来适配。

按照笔者的业务逻辑,需要用Go语言处理一批外部数据,然后写入到HBase中,单位时间内数据吞吐量不大,使用简单直接的操作方式即可满需求,开发周期尽量做到短平快。经过初步调研,找到若干基于Go语言的HBase备选库,包括tsuna/go-hbase,pingcap/go-hbase,Lazyshot/go-hbase等,相应代码均在github上开源。

      然而在使用中,上述几个开源库操作工作环境部署的HBase时均出现各种问题,有不同的服务端程序报错信息出现。经过排查分析,发现一个可能原因是笔者使用的HBase版本是2.0.x,这个版本相对于HBase1.0.x有较大升级,可能存在一些底层接口不兼容。之所以说是可能原因,是因为这些开源的兼容版本要求只给出了>HBase0.9x版,并没有给出说明是否兼容2.0以上版本。

      解决办法通常有三个——其一,重新部署HBase,降低到相应版本;其二,继续寻找支持2.0以上版本的库;其三,修改库兼容工作环境的HBase版本。重新部署环境依赖于运维同事,能否找到兼容的库基于之前经验不确定性较高,求人不如求己,于是选择第三个方法,直接Hack现有库以兼容当前版本HBase。

      这里选择pingcap/go-hbase库作为修改对象,原因是代码结构比较清晰,接口简单,使用方便。按照这个库的官方说法:Derived from Lazyshot/go-hbase. Add some new features and fix some bugs.

      这个库源代码库地址:https://github.com/pingcap/go-hbase  代码树形结构图如图1所示,可见文件并不多,并不是一个复杂项目。


├── action.go

├── action_test.go

├── admin.go

├── admin_test.go

├── call.go

├── client.go

├── client_ops.go

├── client_test.go

├── column.go

├── column_test.go

├── conn.go

├── del.go

├── del_test.go

├── get.go

├── get_test.go

├── iohelper

│   ├── multireader.go

│   ├── pbbuffer.go

│   └── utils.go

├── LICENSE

├── proto

│   ├── AccessControl.pb.go

│   ├── Admin.pb.go

│   ├── Aggregate.pb.go

│   ├── Authentication.pb.go

│   ├── Cell.pb.go

│   ├── Client.pb.go

│   ├── ClusterId.pb.go

│   ├── ClusterStatus.pb.go

│   ├── Comparator.pb.go

│   ├── Encryption.pb.go

│   ├── ErrorHandling.pb.go

│   ├── Filter.pb.go

│   ├── FS.pb.go

│   ├── HBase.pb.go

│   ├── HFile.pb.go

│   ├── LoadBalancer.pb.go

│   ├── MapReduce.pb.go

│   ├── Master.pb.go

│   ├── MultiRowMutation.pb.go

│   ├── RegionServerStatus.pb.go

│   ├── RowProcessor.pb.go

│   ├── RPC.pb.go

│   ├── SecureBulkLoad.pb.go

│   ├── Snapshot.pb.go

│   ├── Tracing.pb.go

│   ├── VisibilityLabels.pb.go

│   ├── WAL.pb.go

│   └── ZooKeeper.pb.go

├── protobuf

│   ├── AccessControl.proto

│   ├── Admin.proto

│   ├── Aggregate.proto

│   ├── Authentication.proto

│   ├── Cell.proto

│   ├── Client.proto

│   ├── ClusterId.proto

│   ├── ClusterStatus.proto

│   ├── Comparator.proto

│   ├── Encryption.proto

│   ├── ErrorHandling.proto

│   ├── Filter.proto

│   ├── FS.proto

│   ├── HBase.proto

│   ├── HFile.proto

│   ├── LoadBalancer.proto

│   ├── MapReduce.proto

│   ├── Master.proto

│   ├── MultiRowMutation.proto

│   ├── RegionServerStatus.proto

│   ├── RowProcessor.proto

│   ├── RPC.proto

│   ├── SecureBulkLoad.proto

│   ├── Snapshot.proto

│   ├── Tracing.proto

│   ├── VisibilityLabels.proto

│   ├── WAL.proto

│   └── ZooKeeper.proto

├── put.go

├── put_test.go

├── README.md

├── result.go

├── result_test.go

├── scan.go

├── scan_test.go

├── service_call.go

├── types.go

└── utils.go

图1. 代码树形结构


二、实现步骤


2.1 源库错误提示

在确保已经安装好go开发环境前提下,安装go-hbase开发包,运行 go get github.com/pingcap/go-hbase,默认这个包会下载到 $GOPATH/pkg/mod/github.com/pingcap/go-hbase@xxxx 路径下

安装好后,开发一个简单Demo,重现一下错误。demo.go代码图2所示,zkHosts是zookeeper集群节点地址,根据实际情况设置,zkRoot是hbase所在根路径,也需要根据实际情况设置,这里是/hbase-unsecure。程序工作很简单,连接HBase,向"demo_tab"数据表的"info"列簇的列"val_1”和"val_2"分别写入数据。


// demo.go

package main

import (

"log"
"fmt"
hbase "github.com/pingcap/go-hbase"
)

func main() {

zkHosts := []string{"zk-node1:2181", "zk-node2:2181", "zk-node3:2181"}
zkRoot := "/hbase-unsecure"
hbTab := "demo_tab" // HBase table name
hbClient, err := hbase.NewClient(zkHosts, zkRoot)
if err != nil {

    log.Fatal(err)

}
defer hbClient.Close()
rowKey := "row_1"  // make a row key
fm := "info" // column family name
p := hbase.NewPut([]byte(rowKey))
p.AddStringValue(fm, "val_1", "a")
p.AddStringValue(fm, "val_2", "b")
// Put the row into HBase
res, err := hbClient.Put(hbTab, p)
if err != nil {
    log.Fatal(err)
}
if res {
    fmt.Println("Put data successfully!")
} else {
    fmt.Println("Failed to put data! :(")
}
return

}

~

                                       图2. demo.go 代码

编译执行上述代码,在HBase-2.0.x环境下会产生图3的提示,错误信息是:org.apache.hadoop.hbase.exceptions.UnknownProtocolException: Is this a pre-hbase-1.0.0 or asynchbase client? Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by reverse Scan.

图3. 返回错误提示,协议不兼容

根据返回错误信息提示,应该是协议不兼容问题,在2.0.x版本中使用了1.0.x的接口,出错位置大是在.LocateRegion方法中。


2.2 修改代码解决问题

根据提示定位错误后,经过研究,发现是2.0.x版本HBase在响应客户端定位RegionServer操作时候(LocateRegion),不再支持GetRequest请求,而是换成了ScanRequest请求,加入对应的处理代码逻辑应该能解决问题。
通常情况下,go开发环境下载的第三方包依赖会存放在$GOPATH/pkg/mod 路径下,具体到本案例,是在 $GOPATH/pkg/mod/github.com/pingcap/go-hbase@xxxxx.yyyyy中。其中GOPATH是环境变量,依据实际情况有所不同,@之后是版本信息。

2.2.1 修改片段之一
在打开文件go-hbase/client.go,找到方法LocateRegion,在375行,将这段的方法调用"call := newCall(&proto.GetRequest "完全注释掉,换成修改后使用ScanRequest的代码块,如图4所示。这部分是对呼叫RPC协议的修改,将原来的GetRequest请求变成ScanRequest请求,除了与Scan相关必要参数外,其余参数与原来GetRequest设置一致,另外,注意Scan:Reversed 属性要设置成true。

/* 注释掉对 GetRequest 请求的调用

call := newCall(&proto.GetRequest{

       Region: &proto.RegionSpecifier{

           Type:  proto.RegionSpecifier_REGION_NAME.Enum(),

           Value: metaRegionName,

       },

       Get: &proto.Get{

           Row: regionRow,

           Column: []*proto.Column{&proto.Column{

               Family: []byte("info"),

           }},

           ClosestRowBefore: pb.Bool(true),

       },

   })

*/

// 换成对 ScanRequest的调用

call := newCall(&proto.ScanRequest{

       Region: &proto.RegionSpecifier{            

           Type:  proto.RegionSpecifier_REGION_NAME.Enum(),            

           Value: metaRegionName,        

        },        

       Scan: &proto.Scan{            

              Reversed: pb.Bool(true),            

              StartRow: regionRow,            

              Column: []*proto.Column{&proto.Column{

                    Family: []byte("info"), 

              }},

         },        

         NumberOfRows: pb.Uint32(1),    

      })


图4. 在LocateRegion中添加ScanRequest发送请求

图5. LocateRegion添加ScanRequest请求截图


2.2.2 修改片段之

继续在LocateRegion方法中,找到解析返回结果部分,在response := <-call.responseCh下方。这部分的switch...case语句中需要添加一个case处理,以处理ScanResponse结果。添加代码如图6所示。
case *proto.ScanResponse部分就是对返回结果处理,如果想进一步了解GetResult()返回的结果数据结构,可以到go-hbase/result.go查看源代码。


switch r := response.(type) {

case *proto.GetResponse:

res := r.GetResult()
if res == nil {
    return nil, errors.Errorf("Empty region: [table=%s] [row=%q] [region_row=%q]", table, row, regionRow)
}
rr := NewResultRow(res)
region, err := c.parseRegion(rr)
if err != nil {
    return nil, errors.Trace(err)
}
c.updateRegionCache(table, region)
return region, nil

case *exception:

return nil, errors.New(r.msg)

case *proto.ScanResponse:  // 添加对ScanResponse的case处理

res := r.GetResults()
if res == nil {
    return nil, errors.Errorf("Empty region: [table=%s] [row=%q] [region_row=%q]", table, row, regionRow)
}
rr := NewResultRow(res[0])
region, err := c.parseRegion(rr)
if err != nil {
    return nil, errors.Trace(err)
}
c.updateRegionCache(table, region)
return region, nil

default:

log.Warnf("Unknown response - %T - %v", r, r)

}

图6. 添加ScanResponse数据处理部分

2.2.3 修改片段之三

实践过程中,发现pingcap/go-hbase库中并没有设置HBase的EffectiveUser的接口,搜索代码发现这个有效用户信息被硬编码写在了程序中并且固定为"pingcap“,这点需要注意,如果用户的环境不存在这个账户,使用时候会报错账户没有权限。
一个较好的解决方法是将这个有效用户设置接口开放出来,根据需求设置。这里笔者就快速有效更改一下,将EffectiveUser由"pingcap"改成"hbase",以跟实际使用环境相符。具体做法是打开go-hbase/conn.go文件,定位到writeConnectionHeader方法,大约是在文件214行。然后找到EffectiveUser: pb.String("pingcap") 这一行代码,改成EffectiveUser: pb.String("hbase"),如图7所示,修改保存大功告成。


func (c *connection) writeConnectionHeader() error {

buf := iohelper.NewPbBuffer()
service := pb.String(ServiceString[c.serviceType])
err := buf.WritePBMessage(&proto.ConnectionHeader{
    UserInfo: &proto.UserInformation{

        EffectiveUser: pb.String("hbase"),

    },
    ServiceName: service,
})
if err != nil {
    return errors.Trace(err)
}
err = buf.PrependSize()
if err != nil {
    return errors.Trace(err)
}
_, err = c.conn.Write(buf.Bytes())
if err != nil {
return errors.Trace(err)
}
return nil

}

图7. 在go-hbase/conn.go中设置EffectiveUser


三、测试结果

继续使用上面demo.go的例子,在修改库之前执行,结果出现的错误提示已经由图3给出。
按照本文第二部分所述步骤修改引用包代码后,我们重新编译运行demo.go,得到如下返回信息:

yyyy/mm/dd hh:mm:ss client.go:169: [debug] connect root region server... host_name:"zk-node4" port:16020 start_code:1655113251448

Put data successfully!

上述输出内容显示数据写入成功,与demo.go代码逻辑相符。再到hbase shell中,运行命令 scan 'demo_tab' 数据,返回结果如图8,可以看到,demo.go程序中对应的数据已经写入到HBase的表中。

图8. HBase中数据表scan操作返回的结果


四、结语

针对golang操作HBase-2.0.x的部分程序开发包,或许存在接口不兼容问题,本文通过修改代码给出了一个协议兼容的方案。按照文中步骤操作,并运行实例测试,结果显示能够实现Go语言操作HBase-2.0.x版本的目的。需要注意的是,本文给出的解决方案是一个快速临时的方案,并没有经过大数据量和大规模并发操作测试,据此实现,在高并发高数据吞吐量情况下可能存在未知风险,如果应用项对安全性和高可用性方面有较高需求,建议使用其他更稳妥方案。另外,当前HBase最新版本已经到3.0以上,最新版本HBase的兼容性测试将在未来有需要时候进行。
















文章转载自SunKangShanghai,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论