const LOCK_PREFIX = "/service/lock"
var (
cli *clientv3.Client
dialTimeout = 2 * time.Second
requestTimeout = 10 * time.Second
kv clientv3.KV
)
/**
不要 client, err := clientv3.New 这样无法为全局的变量cli赋值
因为:= 是声明一个临时变量 所以注意下 小坑
*/
func init() {
ctx := context.Background()
var err error
cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{"10.101.175.176:2379", "10.101.175.176:2389", "10.101.175.176:2399"},
DialTimeout: dialTimeout,
Context: ctx,
})
if err != nil {
log.Println("connect failed,err:", err)
}
kv = clientv3.NewKV(cli)
log.Println("connected")
}
/**
测试添加 获取key
**/
func TestConnectClient(t *testing.T) {
defer cli.Close()
_, err := kv.Put(cli.Ctx(), "foo", "bar")
if err != nil {
log.Fatal(err)
}
resp, err := kv.Get(cli.Ctx(), "foo")
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s=>%s\n", ev.Key, ev.Value)
}
}
/**
测试分布式锁
**/
func TestLock(t *testing.T) {
defer cli.Close()
block := make(chan int)
updateDb := func(key string) {
session, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal("concurrency.NewSession err:", err)
}
defer session.Close()
lock := concurrency.NewMutex(session, LOCK_PREFIX)
err = lock.Lock(context.TODO())
if err != nil {
log.Fatal("lock err:", key, err)
time.Sleep(time.Second)
} else {
log.Printf("DB操作%s->占用锁5秒", key)
block <- 1
time.Sleep(5 * time.Second)
lock.Unlock(context.TODO())
return
}
}
go updateDb("1")
go updateDb("2")
go updateDb("3")
go updateDb("4")
<-block
<-block
<-block
<-block
}
/**
测试节点监听
**/
func TestWatcher(t *testing.T) {
defer cli.Close()
key := "foo"
_, err := kv.Put(cli.Ctx(), key, "bar")
if err != nil {
log.Fatal(err)
}
go func() {
watch := cli.Watch(cli.Ctx(), key)
for wresp := range watch {
for _, ev := range wresp.Events {
log.Printf("%s %q:%q", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
cli.Put(cli.Ctx(), key, "bar1")
cli.Delete(cli.Ctx(), key)
cli.Put(cli.Ctx(), key, "bar1")
cli.Delete(cli.Ctx(), key)
cli.Put(cli.Ctx(), key, "bar2")
time.Sleep(2 * time.Second)
}
/**
测试根据节点前缀监听
**/
func TestWatchWithPrefix(t *testing.T) {
defer cli.Close()
path := "/task/cmd/"
go func() {
watch := cli.Watch(cli.Ctx(), path, clientv3.WithPrefix())
for wresp := range watch {
for _, ev := range wresp.Events {
log.Printf("%s %q:%q", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
key := path + "task1"
cli.Put(cli.Ctx(), key, "bar1")
cli.Delete(cli.Ctx(), key)
key = path + "task2"
cli.Put(cli.Ctx(), key, "bar1")
cli.Delete(cli.Ctx(), key)
cli.Put(cli.Ctx(), key, "bar2")
time.Sleep(2 * time.Second)
}
/**
测试服务注册,并提供三种保持服务存活的方式
**/
func TestLease(t *testing.T) {
cli2, err := clientv3.New(clientv3.Config{
Endpoints: []string{"10.101.175.176:2379", "10.101.175.176:2389", "10.101.175.176:2399"},
DialTimeout: dialTimeout,
Context: context.Background(),
})
if err != nil {
log.Println("connect failed,err:", err)
}
defer cli2.Close()
path := "/task/node/"
go func() {
watch := cli2.Watch(cli2.Ctx(), path, clientv3.WithPrefix())
for wresp := range watch {
for _, ev := range wresp.Events {
log.Printf("%s %q:%q", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
lease, _ := cli.Grant(cli.Ctx(), 1+2)
/**
服务注册1
*/
keep := func() {
timer := time.NewTimer(1*time.Second)
for {
select {
case <-timer.C:
resp, err := cli.KeepAliveOnce(context.TODO(), lease.ID)
if err != nil {
log.Println(err)
} else {
log.Println(resp.ResponseHeader)
timer.Reset(1*time.Second)
}
}
}
}
/**
服务注册方式2
*/
keep = func() {
resp, _ := cli.KeepAlive(context.TODO(), lease.ID)
for respone := range resp{
log.Println(respone.ResponseHeader)
}
}
/**
服务注册方式3
*/
keep = func() {
_,_ = cli.KeepAlive(context.TODO(), lease.ID)
}
go keep()
kv.Put(cli.Ctx(), path+"1", "1.1.1.1", clientv3.WithLease(lease.ID))
//time.Sleep(2 * time.Second)
//cli.Close()
time.Sleep(60 * time.Second)
}
文章转载自一把尺子,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




