ETCD是coreOS开源的用于共享配置和服务发现的分布式,一致性的KV存储系统。是一款类似于zk有望取代复杂的zk的用go语言开发的存储系统。

基本概念

etcd有着几方面的优势:

  • 一致性协议: ETCD使用Raft协议, ZK使用ZAB(类PAXOS协议),前者容易理解,方便工程实现;
  • 运维方面:ETCD方便运维,ZK难以运维;
  • 项目活跃度:ETCD社区与开发活跃,ZK已经很臃肿维护困难;
  • API:ETCD提供HTTP+JSON, gRPC接口,跨平台跨语言,ZK需要使用其客户端;
  • 访问安全方面:ETCD支持HTTPS访问,ZK在这方面缺失;

etcd的使用场景和zk相似

  • 配置管理
  • 服务注册与发现
  • 分布式队列
  • 分布式锁

安装

单点部署

安装比较简单,直接去开源的github上去下在压缩包,然后解压就有对应的可执行文件,可以将可执行文件etcd,etcdctl复制到/usr/bin下面使用

集群部署

静态部署用命令直接启动

node1

etcd -name niub1 -debug \
-initial-advertise-peer-urls http://node1-ip:2380 \
-listen-peer-urls http://node1-ip:2380 \
-listen-client-urls http://node1-ip:2379,http://127.0.0.1:2379 \
-advertise-client-urls http://node1-ip:2379 \
-initial-cluster-token etcd-cluster-1 \
-initial-cluster niub1=http://node1-ip:2380,niub2=http://node2-ip:2380,niub3=http://node3-ip:2380 \
-initial-cluster-state new  >> ./etcd.log 2>&1 &

node2

etcd -name niub2 -debug \
-initial-advertise-peer-urls http://node2-ip:2380 \
-listen-peer-urls http://node2-ip:2380 \
-listen-client-urls http://node2-ip:2379,http://127.0.0.1:2379 \
-advertise-client-urls http://node2-ip:2379 \
-initial-cluster-token etcd-cluster-1 \
-initial-cluster niub1=http://node1-ip:2380,niub2=http://node2-ip:2380,niub3=http://node3-ip:2380 \
-initial-cluster-state new  >> ./etcd.log 2>&1 &

node3

etcd -name niub3 -debug \
-initial-advertise-peer-urls http://node3-ip:2380 \
-listen-peer-urls http://node3-ip:2380 \
-listen-client-urls http://node3-ip:2379,http://127.0.0.1:2379 \
-advertise-client-urls http://node3-ip:2379 \
-initial-cluster-token etcd-cluster-1 \
-initial-cluster niub1=http://node1-ip:2380,niub2=http://node2-ip:2380,niub3=http://node3-ip:2380 \
-initial-cluster-state new  >> ./etcd.log 2>&1 &

我们来看一下基本参数

ETCD_NAME –节点名称
ETCD_DATA_DIR -指定节点的数据存储目录
ETCD_LISTEN_PEER_URLS -监听URL,用于与其他节点通讯
ETCD_LISTEN_CLIENT_URLS –暴露自己的同时最好新增一个127.0.0.1的监听地址,便于etcdctl调用,当然用0.0.0.0也是可以的

ETCD_INITIAL_ADVERTISE_PEER_URLS -  告知集群其他节点url
ETCD_INITIAL_CLUSTER -  告知集群其他节点url.
ETCD_INITIAL_CLUSTER_STATE -  静态模式部署 new
ETCD_INITIAL_CLUSTER_TOKEN -集群的识别码
ETCD_ADVERTISE_CLIENT_URLS -告知客户端url, 也就是服务的url,不能包含127.0.0.1

然后就可以检查一下对集群情况了

etcdctl member list
curl http://10.10.0.14:2379/v2/members

两种方式都能返回三个节点的相关情况,也可以使用

etcdctl cluster-health

这样etcd的集群就搭建成功了。

正常会将其加入到系统服务中,首先创建设置配置文件

cat /etc/etcd/etcd.conf

# [member]
ETCD_NAME="etcd-2"
ETCD_DATA_DIR="/data/etcd/"
#ETCD_WAL_DIR=""
#ETCD_SNAPSHOT_COUNT="10000"
#ETCD_HEARTBEAT_INTERVAL="100"
#ETCD_ELECTION_TIMEOUT="1000"
#ETCD_LISTEN_PEER_URLS="http://localhost:2380"
#ETCD_LISTEN_CLIENT_URLS="http://localhost:2379"
ETCD_LISTEN_PEER_URLS="http://0.0.0.0:7001"
ETCD_LISTEN_CLIENT_URLS="http://0.0.0.0:4001"
#ETCD_MAX_SNAPSHOTS="5"
#ETCD_MAX_WALS="5"
#ETCD_CORS=""
#
#[cluster]
#ETCD_INITIAL_ADVERTISE_PEER_URLS="http://localhost:2380"
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://172.32.148.128:7001"
# if you use different ETCD_NAME (e.g. test), set ETCD_INITIAL_CLUSTER value for this name, i.e. "test=http://..."
#ETCD_INITIAL_CLUSTER="default=http://localhost:2380"
ETCD_INITIAL_CLUSTER="etcd-1=http://172.32.148.127:7001,etcd-2=http://172.32.148.128:7001,etcd-3=http://172.32.148.129:7001,etcd-4=http://172.32.148.130:7001"
ETCD_INITIAL_CLUSTER_STATE="new"
#ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
#ETCD_ADVERTISE_CLIENT_URLS="http://localhost:2379"
ETCD_ADVERTISE_CLIENT_URLS="http://172.32.148.128:4001"
#ETCD_DISCOVERY=""
#ETCD_DISCOVERY_SRV=""
#ETCD_DISCOVERY_FALLBACK="proxy"
#ETCD_DISCOVERY_PROXY=""
#ETCD_STRICT_RECONFIG_CHECK="false"
#ETCD_AUTO_COMPACTION_RETENTION="0"
.......

然后增加开机启动配置

cat /uusr/lib/systemd/system/etcd.service

[Unit]
Description=Etcd Server
After=network.target
After=network-online.target
Wants=network-online.target

[Service]
Type=notify
WorkingDirectory=/var/lib/etcd/
EnvironmentFile=-/etc/etcd/etcd.conf
User=root
# set GOMAXPROCS to number of processors
#ExecStart=/bin/bash -c "GOMAXPROCS=$(nproc) /usr/bin/etcd --name=\"${
ETCD_NAME}\" --data-dir=\"${
ETCD_DATA_DIR}\" --listen-client-urls=\"${
ETCD_LISTEN_CLIENT_URLS}\""


ExecStart=/bin/bash -c "GOMAXPROCS=$(nproc) /usr/bin/etcd --name=\"${
ETCD_NAME}\" --data-dir=\"${
ETCD_DATA_DIR}\" --listen-client-urls=\"${
ETCD_LISTEN_CLIENT_URLS}\" --listen-peer-urls=\"${
ETCD_LISTEN_PEER_URLS}\" --advertise-client-urls=\"${
ETCD_ADVERTISE_CLIENT_URLS}\" --initial-advertise-peer-urls=\"${
ETCD_INITIAL_ADVERTISE_PEER_URLS}\" --initial-cluster=\"${
ETCD_INITIAL_CLUSTER}\" --initial-cluster-state=\"${
ETCD_INITIAL_CLUSTER_STATE}\""


Restart=on-failure
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target

当然我们可以使用别人的rpm包来安装,就有现成的配置文件,我们在上面修改就行了。

下面我们来来系统启动etcd

systemctl daemon-reload
systemctl enable etcd.service
systemctl start etcd.service

可以检查集群了,有一些需要主要的地方,一个就是服务的用户要有对应目录的权限。

k8s部署

一般部署在每个master节点上,组成一个集群。

使用

直接参考官方的md文件。就是正常的key/value类型的数据库的使用方法,类似于redis的使用。比如

写数据
#etcdctl put foo bar
#etcdctl put fool bar1 --lease=1234abcd
读数据
#etcdctl get foo
#etcdctl get foo --rev=3
删除数据
#etcdctl del foo
Watch机制
#etcdctl watch foo
租约TTL
#etcdctl lease grant 10
#etcdctl put –lease=3269xxx foo bar
#etcdctl lease revoke 3269xxx
#etcdctl lease keep-alive 3269xxx

etcd碎片整理(defragmentation)

$ etcdctl defrag Finished defragmenting etcd member[127.0.0.1:2379]

对每个节点的defrag时间需要错开,不能同时进行。

client

etcd/clientv3 is the official Go etcd client for v3.

基本实例

package main

import (
    "time"
    "github.com/coreos/etcd/clientv3"
    "fmt"
    "context"
    "github.com/coreos/etcd/mvcc/mvccpb"
    "ketang/netWork/0604_Socket/Tool"
)

var (
    dialTimeout = 5 * time.Second
    requestTimeout = 2 * time.Second
    endPoints = []string{"127.0.0.1:2379"} //etcd 默认接受数据的端口2379
)
//添加 删除 查找 前缀 延时

var etcd *clientv3.Client
func main()  {
    fmt.Println(Tool.GetLocalIp())
    var err error
    etcd, err =clientv3.New(clientv3.Config{
        Endpoints:endPoints,
        DialTimeout:dialTimeout,


    })
    if err != nil {
        fmt.Println(err)
    }

    //添加
    err = putValue("a", "abc")
    fmt.Println(err)

    //查找
    result := getValue("a")
    fmt.Println(result)

    //删除
    cnt := delValue("a")
    fmt.Println("delete:", cnt)


    err = putValue("b1", "abc1")
    err = putValue("b2", "abc2")
    err = putValue("b3", "abc3")

    //按前缀查找
    result = getValueWIthPrefix("b")
    fmt.Println(result)
    for _,item := range result {
        fmt.Println(string(item.Key),string(item.Value))
    }

    //按前缀删除
    cnt2 := delValueWithPrefix("b")
    fmt.Println("批量删除:", cnt2)



    //事务处理
    putValue("user1", "zhangsan")
    _,err = etcd.Txn(context.TODO()).
        If(clientv3.Compare(clientv3.Value("user1"),"=", "zhangsan")).
        Then(clientv3.OpPut("user1", "zhangsan")).
        Else(clientv3.OpPut("user1", "lisi")).Commit()

    fmt.Println(err)
    result = getValue("user1")
    fmt.Println("user1:", string(result[0].Value))


    //lease 设置有效时间
    resp, err:= etcd.Grant(context.TODO(), 1)
    _,err = etcd.Put(context.TODO(), "username","wangwu",clientv3.WithLease(resp.ID))

    time.Sleep(3 * time.Second)

    v := getValue("username")
    fmt.Println("lease:",v)


    //watch监听的使用
    putValue("w", "hello")
    go func() {
        rch := etcd.Watch(context.Background(),"w")
        for wresp := range  rch {
            for _,ev := range wresp.Events {
                fmt.Printf("watch>>w  %s %q %q\n", ev.Type,ev.Kv, ev.Kv)
            }
        }
    }()

    putValue("w", "hello world!")



    //监听某个key在一定范围内 value的变化
    //putValue("fo0", "a")
    go func() {
        //监听范围 [fo0-fo3)
        rch := etcd.Watch(context.Background(), "fo0", clientv3.WithRange("fo3"))

        for wresp := range  rch {
            for _,ev := range wresp.Events {
                fmt.Printf("watch range  --   %s %q %q\n", ev.Type,ev.Kv, ev.Kv)
            }
        }
    }()

    putValue("fo0", "b")
    putValue("fo1", "b")
    putValue("fo2", "c")
    putValue("fo2.5", "c")
    putValue("fo3", "c")


    time.Sleep(10 * time.Second)

}


//添加键值对
func putValue(key, value string)  error  {
    _, err := etcd.Put(context.TODO(),key, value)
    return err
}

//查询
func getValue(key string) []*mvccpb.KeyValue  {
    resp, err := etcd.Get(context.TODO(), key)
    if err != nil {
        return  nil
    } else {
        return resp.Kvs
    }
}

// 返回删除了几条数据
func delValue(key string) int64  {
    res,err := etcd.Delete(context.TODO(),key)
    if err != nil {
        return 0
    } else {
        return res.Deleted
    }

}


//按照前缀删除
func delValueWithPrefix(prefix string) int64  {
    res,err := etcd.Delete(context.TODO(),prefix,clientv3.WithPrefix())
    if err != nil {
        fmt.Println(err)
        return 0
    } else {
        return res.Deleted
    }
}

func getValueWIthPrefix(prefix string) []*mvccpb.KeyValue {
    resp, err := etcd.Get(context.TODO(), prefix, clientv3.WithPrefix())
    if err != nil {
        return  nil
    } else {
        return resp.Kvs
    }
}

使用总结

conn.(clientv3.Client).Close()

cannot call pointer method on conn.(clientv3.Client)

需要使用指针

conn.(*clientv3.Client).Close()

原理

主要使用的raft协议实现,可以查看这里