暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ETCD go client 测试

一把尺子 2018-01-02
224
const LOCK_PREFIX = "/service/lock"
var (
   cli            *clientv3.Client
   dialTimeout    = 2 * time.Second
   requestTimeout = 10 * time.Second
   kv             clientv3.KV
)
/**
不要 client, err := clientv3.New  这样无法为全局的变量cli赋值
因为:= 是声明一个临时变量  所以注意下  小坑
*/

func init() {
   ctx := context.Background()
   var err error
   cli, err = clientv3.New(clientv3.Config{
       Endpoints:   []string{"10.101.175.176:2379", "10.101.175.176:2389", "10.101.175.176:2399"},
       DialTimeout: dialTimeout,
       Context:     ctx,
   })
   if err != nil {
       log.Println("connect failed,err:", err)
   }
   kv = clientv3.NewKV(cli)
   log.Println("connected")
}
/**
测试添加 获取key
**/

func TestConnectClient(t *testing.T) {
   defer cli.Close()
   _, err := kv.Put(cli.Ctx(), "foo", "bar")
   if err != nil {
       log.Fatal(err)
   }
   resp, err := kv.Get(cli.Ctx(), "foo")
   if err != nil {
       log.Fatal(err)
   }
   for _, ev := range resp.Kvs {
       fmt.Printf("%s=>%s\n", ev.Key, ev.Value)
   }
}
/**
测试分布式锁
**/

func TestLock(t *testing.T) {
   defer cli.Close()
   block := make(chan int)
   updateDb := func(key string) {
       session, err := concurrency.NewSession(cli)
       if err != nil {
           log.Fatal("concurrency.NewSession err:", err)
       }
       defer session.Close()
       lock := concurrency.NewMutex(session, LOCK_PREFIX)
       err = lock.Lock(context.TODO())
       if err != nil {
           log.Fatal("lock err:", key, err)
           time.Sleep(time.Second)
       } else {
           log.Printf("DB操作%s->占用锁5秒", key)
           block <- 1
           time.Sleep(5 * time.Second)
           lock.Unlock(context.TODO())
           return
       }
   }
   go updateDb("1")
   go updateDb("2")
   go updateDb("3")
   go updateDb("4")
   <-block
   <-block
   <-block
   <-block
}
/**
测试节点监听
**/

func TestWatcher(t *testing.T) {
   defer cli.Close()
   key := "foo"
   _, err := kv.Put(cli.Ctx(), key, "bar")
   if err != nil {
       log.Fatal(err)
   }
   go func() {
       watch := cli.Watch(cli.Ctx(), key)
       for wresp := range watch {
           for _, ev := range wresp.Events {
               log.Printf("%s %q:%q", ev.Type, ev.Kv.Key, ev.Kv.Value)
           }
       }
   }()
   cli.Put(cli.Ctx(), key, "bar1")
   cli.Delete(cli.Ctx(), key)
   cli.Put(cli.Ctx(), key, "bar1")
   cli.Delete(cli.Ctx(), key)
   cli.Put(cli.Ctx(), key, "bar2")
   time.Sleep(2 * time.Second)
}
/**
测试根据节点前缀监听
**/

func TestWatchWithPrefix(t *testing.T) {
   defer cli.Close()
   path := "/task/cmd/"
   go func() {
       watch := cli.Watch(cli.Ctx(), path, clientv3.WithPrefix())
       for wresp := range watch {
           for _, ev := range wresp.Events {
               log.Printf("%s %q:%q", ev.Type, ev.Kv.Key, ev.Kv.Value)
           }
       }
   }()
   key := path + "task1"
   cli.Put(cli.Ctx(), key, "bar1")
   cli.Delete(cli.Ctx(), key)
   key = path + "task2"
   cli.Put(cli.Ctx(), key, "bar1")
   cli.Delete(cli.Ctx(), key)
   cli.Put(cli.Ctx(), key, "bar2")
   time.Sleep(2 * time.Second)
}
/**
测试服务注册,并提供三种保持服务存活的方式
**/

func TestLease(t *testing.T) {
   cli2, err := clientv3.New(clientv3.Config{
       Endpoints:   []string{"10.101.175.176:2379", "10.101.175.176:2389", "10.101.175.176:2399"},
       DialTimeout: dialTimeout,
       Context:     context.Background(),
   })
   if err != nil {
       log.Println("connect failed,err:", err)
   }
   defer cli2.Close()
   path := "/task/node/"
   go func() {
       watch := cli2.Watch(cli2.Ctx(), path, clientv3.WithPrefix())
       for wresp := range watch {
           for _, ev := range wresp.Events {
               log.Printf("%s %q:%q", ev.Type, ev.Kv.Key, ev.Kv.Value)
           }
       }
   }()
   lease, _ := cli.Grant(cli.Ctx(), 1+2)
   /**
   服务注册1
    */

   keep := func() {
       timer := time.NewTimer(1*time.Second)
       for {
           select {
           case <-timer.C:
               resp, err := cli.KeepAliveOnce(context.TODO(), lease.ID)
               if err != nil {
                   log.Println(err)
               } else {
                   log.Println(resp.ResponseHeader)
                   timer.Reset(1*time.Second)
               }
           }
       }
   }
   /**
   服务注册方式2
    */

   keep = func() {
       resp, _ := cli.KeepAlive(context.TODO(), lease.ID)
       for respone := range resp{
           log.Println(respone.ResponseHeader)
       }
   }
   /**
   服务注册方式3
    */

   keep = func() {
       _,_ = cli.KeepAlive(context.TODO(), lease.ID)
   }
   go keep()
   kv.Put(cli.Ctx(), path+"1", "1.1.1.1", clientv3.WithLease(lease.ID))
   //time.Sleep(2 * time.Second)
   //cli.Close()
   time.Sleep(60 * time.Second)
}


文章转载自一把尺子,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论