Go Kafka客户端简单示例
客户端
生产者
库
1.sarama目前使用最多的golang的client
go get github.com/Shopify/sarama
该库要求kafka版本在0.8及以上,支持kafka定义的high-level API和low-level API,但不支持常用的consumer自动rebalance和offset追踪,所以一般得结合cluster版本使用。
2.sarama-cluster依赖库,弥补了上面了不足
go get github.com/bsm/sarama-cluster
需要kafka 0.9及以上版本
生产模式
同步消息模式
import (
"github.com/Shopify/sarama"
"time"
"log"
"fmt"
"os"
"os/signal"
"sync"
)
var Address = []string{"10.130.138.164:9092","10.130.138.164:9093","10.130.138.164:9094"}
func main() {
syncProducer(Address)
//asyncProducer1(Address)
}
//同步消息模式
func syncProducer(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
defer p.Close()
topic := "test"
srcValue := "sync: this is a message. index=%d"
for i:=0; i<10; i++ {
value := fmt.Sprintf(srcValue, i)
msg := &sarama.ProducerMessage{
Topic:topic,
Value:sarama.ByteEncoder(value),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
log.Printf("send message(%s) err=%s \n", value, err)
}else {
fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d \n", part, offset)
}
time.Sleep(2*time.Second)
}
}
同步生产者发送消息,使用的不是channel,并且SendMessage方法有三个返回的值,分别为这条消息的被发送到了哪个partition,处于哪个offset,是否有error。
也就是说,只有在消息成功的发送并写入了broker,才会有返回值。
异步消息之Goroutines
异步消费者(Goroutines):用不同的goroutine异步读取Successes和Errors channel
func asyncProducer1(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
//config.Producer.Partitioner = 默认为message的hash
p, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
//Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var wg sync.WaitGroup
var enqueued, successes, errors int
wg.Add(2) //2 goroutine
// 发送成功message计数
go func() {
defer wg.Done()
for range p.Successes() {
successes++
}
}()
// 发送失败计数
go func() {
defer wg.Done()
for err := range p.Errors() {
log.Printf("%s 发送失败,err:%s\n", err.Msg, err.Err)
errors++
}
}()
// 循环发送信息
asrcValue := "async-goroutine: this is a message. index=%d"
var i int
Loop:
for {
i++
value := fmt.Sprintf(asrcValue, i)
msg := &sarama.ProducerMessage{
Topic:"test",
Value:sarama.ByteEncoder(value),
}
select {
case p.Input() <- msg: // 发送消息
enqueued++
fmt.Fprintln(os.Stdout, value)
case <-signals: // 中断信号
p.AsyncClose()
break Loop
}
time.Sleep(2 * time.Second)
}
wg.Wait()
fmt.Fprintf(os.Stdout, "发送数=%d,发送成功数=%d,发送失败数=%d \n", enqueued, successes, errors)
}
异步生产者使用channel接收(生产成功或失败)的消息,并且也通过channel来发送消息,这样做通常是性能最高的。
异步消息之Select
异步消费者(Select):同一线程内,通过select同时发送消息 和 处理errors计数。该方式效率较低,如果有大量消息发送, 很容易导致success和errors的case无法执行,从而阻塞一定时间。
当然可以通过设置config.Producer.Return.Successes=false;config.Producer.Return.Errors=false来解决
func asyncProducer2(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Errors = true
p, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
//Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var enqueued, successes, errors int
asrcValue := "async-select: this is a message. index=%d"
var i int
Loop:
for {
i++
value := fmt.Sprintf(asrcValue, i)
msg := &sarama.ProducerMessage{
Topic:"test",
Value:sarama.ByteEncoder(value),
}
select {
case p.Input() <- msg:
fmt.Fprintln(os.Stdout, value)
enqueued++
case <-p.Successes():
successes++
case err := <-p.Errors():
log.Printf("%s 发送失败,err:%s\n", err.Msg, err.Err)
errors++
case <-signals:
p.AsyncClose()
break Loop
}
time.Sleep(2 * time.Second)
}
fmt.Fprintf(os.Stdout, "发送数=%d,发送失败数=%d \n", enqueued, errors)
}
注意事项
我们在来看看Shopify/sarama的producer有两种运行模式的一些注意的的地方
同步模式:producer把消息发给kafka之后会等待结果返回。
config := sarama.NewConfig()
config.Producer.Return.Successes = true
client, err := sarama.NewClient([]{"localhost:9092"}, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatalf("unable to create kafka producer: %q", err)
}
defer producer.Close()
text := fmt.Sprintf("message %08d", i)
partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)})
if err != nil {
log.Fatalf("unable to produce message: %q", err)
}
...
注意同步模式下,下面配置必须置上:
config.Producer.Return.Successes = true
否则运行报错:
2018/12/25 08:08:30 unable to create kafka producer: "kafka:
invalid configuration (Producer.Return.Successes must be true to be used in a SyncProducer)"
异步模式:producer把消息发给kafka之后不会等待结果返回。
异步模式,顾名思义就是produce一个message之后不等待发送完成返回;这样调用者可以继续做其他的工作。
config := sarama.NewConfig()
// config.Producer.Return.Successes = true
client, err := sarama.NewClient([]{"localhost:9092"}, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewAsyncProducerFromClient
if err != nil {
log.Fatalf("unable to create kafka producer: %q", err)
}
defer producer.Close()
text := fmt.Sprintf("message %08d", i)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
// wait response
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default",)
}
...
异步模式produce一个消息后,缺省并不会报告成功状态,需要打开返回配置。
config.Producer.Return.Successes = false
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
}
则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false,那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。当然可以加一个default分支绕过去,就不会挂住了:
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default")
}
如果打开了Return.Successes配置,则上述代码段等同于同步方式
config.Producer.Return.Successes = true
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
}
从log可以看到,每发送一条消息,收到一条Return.Successes,类似于:
2018/12/25 08:51:51 Produced message: [message 00002537]
2018/12/25 08:51:51 Produced message successes: [message 00002537]
2018/12/25 08:51:51 Produced message: [message 00002538]
2018/12/25 08:51:51 Produced message successes: [message 00002538]
2018/12/25 08:51:51 Produced message: [message 00002539]
2018/12/25 08:51:51 Produced message successes: [message 00002539]
2018/12/25 08:51:51 Produced message: [message 00002540]
2018/12/25 08:51:51 Produced message successes: [message 00002540]
2018/12/25 08:51:51 Produced message: [message 00002541]
2018/12/25 08:51:51 Produced message successes: [message 00002541]
2018/12/25 08:51:51 Produced message: [message 00002542]
2018/12/25 08:51:51 Produced message successes: [message 00002542]
2018/12/25 08:51:51 Produced message: [message 00002543]
2018/12/25 08:51:51 Produced message successes: [message 00002543]
...
就像是同步produce一样的行为了。
如果打开了Return.Successes配置,而又没有producer.Successes()提取,那么Successes()这个chan消息会被写满。
config.Producer.Return.Successes = true
...
log.Printf("Reade to Produced message: [%s]\n",text)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default",)
}
写满的结果就是不能再写入了,导致后面的Return.Successes消息丢失, 而且producer也会挂住,因为共享的buffer被占满了,大量的Return.Successes没有被消耗掉。
运行一段时间后:
2018/12/25 08:58:24 Reade to Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000608]
在produce第00000608个message的时候被挂住了,因为消息缓冲满了;这个缓冲的大小是可配的(可能是这个MaxRequestSize?),但是不管大小是多少,如果没有去提取Success消息最终都会被占满的。
结论就是说配置config.Producer.Return.Successes = true和操作<-producer.Successes()必须配套使用;配置成true,那么就要去读取Successes,如果配置成false,则不能去读取Successes。
重要配置参数
1、MaxMessageBytes int
这个参数影响了一条消息的最大字节数,默认是1000000。但是注意,这个参数必须要小于broker中的 message.max.bytes。
2、RequiredAcks RequiredAcks
这个参数影响了消息需要被多少broker写入之后才返回。取值可以是0、1、-1
- 1代表了不需要等待broker确认才返回、这样最容易丢失消息但同时性能却是最好的
- 0代表需要分区的leader确认后才返回,这是一种折中的方案,它会等待副本 Leader 响应,但不会等到 follower 的响应。一旦 Leader 挂掉消息就会丢失。但性能和消息安全性都得到了一定的保证。
- -1代表需要分区的所有副本确认后返回这样可以保证消息不会丢失,但同时性能和吞吐量却是最低的。
3、Partitioner PartitionerConstructor
这个是分区器。Sarama默认提供了几种分区器,如果不指定默认使用Hash分区器。
4、Retry
这个参数代表了重试的次数,以及重试的时间,主要发生在一些可重试的错误中。
在重试过程中需要考虑幂等操作了,比如当同时发送了2个请求,如果第一个请求发送到broker中,broker写入失败了,但是第二个请求写入成功了,那么客户端将重新发送第一个消息的请求,这个时候会造成乱序。又比如当第一个请求返回acks的时候,因为网络原因,客户端没有收到,所以客户端进行了重发,这个时候就会造成消息的重复。
所以,幂等生产者就是为了保证消息发送到broker中是有序且不重复的。一共有两个参数
5、MaxOpenRequests int
这个参数代表了允许没有收到acks而可以同时发送的最大batch数。
6、Idempotent bool
用于幂等生产者,当这一项设置为true的时候,生产者将保证生产的消息一定是有序且精确一次的。
7、Flush
用于设置将消息打包发送,简单来讲就是每次发送消息到broker的时候,不是生产一条消息就发送一条消息,而是等消息累积到一定的程度了,再打包发送。所以里面含有两个参数。一个是多少条消息触发打包发送,一个是累计的消息大小到了多少,然后发送。
8、Compression
压缩数据进行发送,选择不同支持的压缩方式,也可以不压缩,压缩比较消耗资源,不压缩可以提高速度。
源码解析
创建过程
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
一切都从这么一行开始讲起。在这里其实就只有两个部分,先是通过地址和配置,构建一个 client 。
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
// 构建client
client, err := NewClient(addrs, conf)
if err != nil {
return nil, err
}
// 构建AsyncProducer
return newAsyncProducer(client)
}
Client的创建
先构建一个 client 结构体。然后创建完之后,刷新元数据,并且启动一个协程,在后台进行刷新。
func NewClient(addrs []string, conf *Config) (Client, error) {
...
// 构建一个client
client := &client{
conf: conf,
closer: make(chan none),
closed: make(chan none),
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
metadataTopics: make(map[string]none),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]int32),
}
// 把用户输入的broker地址作为“种子broker”增加到seedBrokers中
// 随后客户端会根据已有的broker地址,自动刷新元数据,以获取更多的broker地址
// 所以称之为种子
random := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, index := range random.Perm(len(addrs)) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}
...
// 启动协程在后台刷新元数据
go withRecover(client.backgroundMetadataUpdater)
return client, nil
}
元数据的更新
后台更新元数据的设计其实很简单,利用一个 ticker ,按时对元数据进行更新,直到 client 关闭。
func (client *client) backgroundMetadataUpdater() {
// 按照配置的时间更新元数据
ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
defer ticker.Stop()
// 循环获取channel,判断是执行更新操作还是终止
for {
select {
case <-ticker.C:
if err := client.refreshMetadata(); err != nil {
Logger.Println("Client background metadata update:", err)
}
case <-client.closer:
return
}
}
}
然后我们继续来看看 client.refreshMetadata() 这个方法,在这里我们设置了需要刷新元数据的主题,重试的次数,超时的时间。
func (client *client) RefreshMetadata(topics ...string) error {
deadline := time.Time{}
if client.conf.Metadata.Timeout > 0 {
deadline = time.Now().Add(client.conf.Metadata.Timeout)
}
// 设置参数
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}
再看tryRefreshMetadata这个方法。在这个方法中,会选取已经存在的broker,构造获取元数据的请求。在收到回应后,如果不存在任何的错误,就将这些元数据用于更新客户端。
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
...
broker := client.any()
for ; broker != nil && !pastDeadline(0); broker = client.any() {
...
req := &MetadataRequest{
Topics: topics,
// 是否允许创建不存在的主题
AllowAutoTopicCreation: allowAutoTopicCreation
}
response, err := broker.GetMetadata(req)
switch err.(type) {
case nil:
allKnownMetaData := len(topics) == 0
// 对元数据进行更新
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
if shouldRetry {
Logger.Println("client/metadata found some partitions to be leaderless")
return retry(err)
}
return err
case ...
...
}
}
当客户端拿到了 response 之后,首先,先对本地保存 broker 进行更新。然后,对 topic 进行更新,以及这个 topic 下面的那些 partition 。
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
...
// 假设返回了新的broker id,那么保存这些新的broker,这意味着增加了broker、或者下线的broker重新上线了
// 如果返回的id我们已经保存了,但是地址变化了,那么更新地址
// 如果本地保存的一些id没有返回,说明这些broker下线了,那么删除他们
client.updateBroker(data.Brokers)
// 然后对topic也进行元数据的更新
// 主要是更新topic以及topic对应的partition
for _, topic := range data.Topics {
...
// 更新每个topic以及对应的partition
client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
for _, partition := range topic.Partitions {
client.metadata[topic.Name][partition.ID] = partition
...
}
}
与Broker建立连接
主要是通过broker := client.any()来实现的。
func (client *client) any() *Broker {
...
if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}
// 不保证一定是按顺序的
for _, broker := range client.brokers {
_ = broker.Open(client.conf)
return broker
}
return nil
}
Open方法异步的建立了一个tcp连接,然后创建了一个缓冲大小为MaxOpenRequests的channel。
func (b *Broker) Open(conf *Config) error {
if conf == nil {
conf = NewConfig()
}
...
go withRecover(func() {
...
dialer := conf.getDialer()
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
...
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
...
go withRecover(b.responseReceiver)
})
这个名为 responses 的 channel ,用于接收从 broker发送回来的消息。然后,又启动了一个协程,用于接收消息。
从Broker接收响应
当 broker 收到一个 response 的时候,先解析消息的头部,然后再解析消息的内容。并把这些内容写进 response 的 packets 中。
func (b *Broker) responseReceiver() {
for response := range b.responses {
...
// 先根据Header的版本读取对应长度的Header
var headerLength = getHeaderLength(response.headerVersion)
header := make([]byte, headerLength)
bytesReadHeader, err := b.readFull(header)
decodedHeader := responseHeader{}
err = versionedDecode(header, &decodedHeader, response.headerVersion)
...
// 解析具体的内容
buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
bytesReadBody, err := b.readFull(buf)
// 省略了一些错误处理,总之,如果发生了错误,就把错误信息写进 response.errors 中
response.packets <- buf
}
}
发送与接受消息
我们回到这一行代码:
response, err := broker.GetMetadata(req)
我们直接进去,之前看的是元数据的处理,其实也可以用于发送接收消息。发现在这里构造了一个接受返回信息的结构体,然后调用了sendAndReceive方法。
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
response := new(MetadataResponse)
err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}
return response, nil
}
在这里我们可以看到,先是调用了send方法,然后返回了一个promise。并且当有消息写入这个promise的时候,就得到了结果。
而且回想一下我们在receiver中,是不是把获取到的 response 写进了 packets ,把错误结果写进了 errors 呢,跟这里是一致的
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
responseHeaderVersion := int16(-1)
if res != nil {
responseHeaderVersion = res.headerVersion()
}
promise, err := b.send(req, res != nil, responseHeaderVersion)
if err != nil {
return err
}
if promise == nil {
return nil
}
// 这里的promise,是上面send方法返回的
select {
case buf := <-promise.packets:
return versionedDecode(buf, res, req.version())
case err = <-promise.errors:
return err
}
}
在send方法中,把需要发送的消息通过与broker的tcp连接,同步发送到broker中。
然后构建了一个responsePromise类型的channel,然后直接将这个结构体丢进这个channel中。然后回想一下,我们在responseReceiver这个方法中,不断消费接收到的response。
此时在responseReceiver中,收到了send方法传递的responsePromise,他就会通过conn来读取数据,然后将数据写入这个responsePromise的packets中,或者将错误信息写入errors中。
而此时,再看看send方法,他返回了这个responsePromise的指针。所以,sendAndReceive方法就在等待这个responsePromise内的packets或者errors的channel被写入数据。当responseReceiver接收到了响应并且写入数据的时候,packets或者errors就会被写入消息。
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
...
// 将请求的内容封装进 request ,然后发送到Broker中
// 注意一下这里的 b.write(buf)
// 里面做了 b.conn.Write(buf) 这件事情
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
bytes, err := b.write(buf)
...
// 如果我们的response为nil,也就是说当不需要response的时候,是不会放进inflight发送队列的
if !promiseResponse {
// Record request latency without the response
b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
return nil, nil
}
// 构建一个接收响应的 channel ,返回这个channel的指针
// 这个 channel 内部包含了两个 channel,一个用来接收响应,一个用来接收错误
promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
b.responses <- promise
// 这里返回指针特别的关键,是把消息的发送跟消息的接收联系在一起了
return &promise, nil
}
让我们来用一张图说明一下上面这个发送跟接收的过程:
AsyncProcuder
AsyncProcuder是如何发送消息的。我们从newAsyncProducer(client)这一行开始讲起。
func newAsyncProducer(client Client) (AsyncProducer, error) {
...
p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
}
go withRecover(p.dispatcher)
go withRecover(p.retryHandler)
}
先是构建了asyncProducer结构体,然后协程启动的go withRecover(p.dispatcher)。
func (p *asyncProducer) dispatcher() {
handlers := make(map[string]chan<- *ProducerMessage)
...
for msg := range p.input {
...
// 拦截器
for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
...
// 找到这个Topic对应的Handler
handler := handlers[msg.Topic]
if handler == nil {
// 如果此时还不存在这个Topic对应的Handler,那么创建一个
// 虽然说他叫Handler,但他其实是一个无缓冲的
handler = p.newTopicProducer(msg.Topic)
handlers[msg.Topic] = handler
}
// 然后把这条消息写进这个Handler中
handler <- msg
}
}
在这个方法中,首先创建了一个以Topic为key的map,这个map的value是无缓冲的channel。
到这里我们很容易可以推测得出,当通过input发送一条消息的时候,消息会到dispatcher这里,被分配到各个Topic中。
然后让我们来handler = p.newTopicProducer(msg.Topic)这一行的代码。
在这里创建了一个缓冲大小为ChannelBufferSize的channel,用于存放发送到这个主题的消息。
然后创建了一个topicProducer,在这个时候你可以认为消息已经交付给各个topic的topicProducer了。
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
tp := &topicProducer{
parent: p,
topic: topic,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
handlers: make(map[int32]chan<- *ProducerMessage),
partitioner: p.conf.Producer.Partitioner(topic),
}
go withRecover(tp.dispatch)
return input
}
然后我们来看看go withRecover(tp.dispatch)这一行代码。同样是启动了一个协程,来处理消息。
也就是说,到了这一步,对于每一个Topic,都有一个协程来处理消息。
在这个dispatch()方法中,也同样的接收到一条消息,就会去找这条消息所在的分区的channel,然后把消息写进去。
func (tp *topicProducer) dispatch() {
for msg := range tp.input {
...
// 同样是找到这条消息所在的分区对应的channel,然后把消息丢进去
handler := tp.handlers[msg.Partition]
if handler == nil {
handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
tp.handlers[msg.Partition] = handler
}
handler <- msg
}
}
我们进tp.parent.newPartitionProducer(msg.Topic, msg.Partition)这里看看。
你可以发现partitionProducer跟topicProducer是很像的。
其实他们就是代表了一条消息的分发,从producer到topic到partition。
注意,这里面的channel缓冲大小,也是ChannelBufferSize。
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
pp := &partitionProducer{
parent: p,
topic: topic,
partition: partition,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
}
go withRecover(pp.dispatch)
return input
}
到了这一步,我们再来看看消息到了每个partition所在的channel,是如何处理的。
其实在这一步中,主要是做一些错误处理之类的,然后把消息丢进brokerProducer。
可以理解为这一步是业务逻辑层到网络IO层的转变,在这之前我们只关心消息去到了哪个分区,而在这之后,我们需要找到这个分区所在的broker的地址,并使用之前已经建立好的TCP连接,发送这条消息。
func (pp *partitionProducer) dispatch() {
// 找到这个主题和分区的leader所在的broker
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
// 如果此时找到了这个leader
if pp.leader != nil {
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1)
// 发送一条消息来表示同步
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}
...// 各种异常情况
// 然后把消息丢进brokerProducer中
pp.brokerProducer.input <- msg
}
到了这里,大概算是整个发送流程最后的一个步骤了。
我们来看看pp.parent.getBrokerProducer(pp.leader)这行代码里面的内容。
其实就是找到asyncProducer中的brokerProducer,如果不存在,则创建一个。
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()
bp := p.brokers[broker]
if bp == nil {
bp = p.newBrokerProducer(broker)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
}
p.brokerRefs[bp]++
return bp
}
那我们就来看看brokerProducer是怎么创建出来的。
看这个方法中启动的第二个协程,我们可以推测bridge这个channel收到消息后,会把收到的消息打包成一个request,然后调用Produce方法。
并且,将返回的结果的指针地址,写进response中。
然后构造好brokerProducerResponse,并且写入responses中。
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse)
)
bp := &brokerProducer{
parent: p,
broker: broker,
input: input,
output: bridge,
responses: responses,
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)
// minimal bridge to make the network response `select`able
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
response, err := broker.Produce(request)
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
}
close(responses)
})
if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
return bp
}
让我们再来看看broker.Produce(request)这一行代码。
是不是很熟悉呢,我们在client部分讲到的sendAndReceive方法。
而且我们可以发现,如果我们设置了需要Acks,就会返回一个response;如果没设置,那么消息发出去之后,就不管了。
此时在获取了response,并且填入了response的内容后,返回这个response的内容。
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
var (
response *ProduceResponse
err error
)
if request.RequiredAcks == NoResponse {
err = b.sendAndReceive(request, nil)
} else {
response = new(ProduceResponse)
err = b.sendAndReceive(request, response)
}
if err != nil {
return nil, err
}
return response, nil
}
至此,Sarama生产者相关的内容就介绍完毕了。
syncProducer 和asyncProducer的关系
syncProducer 是所有功能都是由asyncProducer实现的,而syncProducer 之所以可以同步发送消息,答案就在SendMessage 函数中,源码如下
func(sp *syncProducer)SendMessage(msg *ProducerMessage) (partitionint32,offsetint64,errerror) {
expectation :=make(chan*ProducerError,1)
msg.expectation = expectation
sp.producer.Input() <- msg
if err := <-expectation;err != nil { // 阻塞等待返回结果
return-1,-1,err.Err
}
return msg.Partition,msg.Offset,nil
}
而使用asyncProducer 时,只需要 直接将信息producer.Input()<-&ProducerMessage{} 放入进producer.Input(), 然后异步读取返回结果 chan*ProducerError
消息传递过程
// one per topic
// partitions messages, then dispatches them by partition
type topicProducer struct{
parent *asyncProducer
topic string
input <-chan*ProducerMessage
breaker *breaker.Breaker
handlers map[int32] chan<- *ProducerMessage
partitioner Partitioner
}
type brokerProducer struct{
parent *asyncProducer
broker *Broker
input <-chan*ProducerMessage
output chan<- *produceSet
responses <-chan*brokerProducerResponse
buffer *produceSet
timer <-chantime.Time
timerFired bool
closing error
currentRetries map[string]map[int32]error
}
由代码可以看出topicProducer,partitionProducer,brokerProducer的parent都是asyncProducer
消息传递过程:
asyncProducer.dispatcher ->topicProducer.dispath -> partitionProducer.dispatch -> brokerProducer ->produceSet
其中produceSet 对消息进行聚集,若配置了压缩的参数,则会压缩一个set中的所有的msg, 即批量压缩, 然后构建一个ProduceRequest ,然后由 broker.Produce 将请求发送出去,其中 broker 结构体代表一个kafka broker 的连接
partitionProducer 会选择leader broker地址 ,若选择失败,则会重新选择leader broker ,然后由这个连接发送消息根据kafka版本不同,消息会放入到不同的结构体中若版本大于V0.11,set.recordsToSend.RecordBatch.addRecord(rec) 将一个rec添加进去,否则将set.recordsToSend.MsgSet.addMessage(msgToSend)
在生成一个newBrokerProducer时,broker会开启消费output, 而output就是一个存放produceSet的channel,阻塞等待刷新ProduceRequest 并将其发送出去
消费者
消费者集群模实现
func main() {
topic := []string{"test"}
var wg = &sync.WaitGroup{}
wg.Add(2)
//广播式消费:消费者1
go clusterConsumer(wg, Address, topic, "group-1")
//广播式消费:消费者2
go clusterConsumer(wg, Address, topic, "group-2")
wg.Wait()
}
// 支持brokers cluster的消费者
func clusterConsumer(wg *sync.WaitGroup,brokers, topics []string, groupId string) {
defer wg.Done()
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// init consumer
consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
if err != nil {
log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)
return
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
log.Printf("%s:Error: %s\n", groupId, err.Error())
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)
}
}()
// consume messages, watch signals
var successes int
Loop:
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
successes++
}
case <-signals:
break Loop
}
}
fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
}
如何优雅的使用 Kafka 生产者
发送流程
- 初始化以及真正发送消息的 kafka-producer-network-thread IO 线程。
- 将消息序列化。
- 得到需要发送的分区。
- 写入内部的一个缓存区中。
- 初始化的 IO 线程不断的消费这个缓存来发送消息。
分区策略
- 指定分区
- 自定义路由
- 默认策略,通常都是使用这种策略,其实就是一种对分区的轮询简单的来说分为以下几步:
- 获取 Topic 分区数。
- 将内部维护的一个线程安全计数器 +1。
- 与分区数取模得到分区编号。
其实这就是很典型的轮询算法,所以只要分区数不频繁变动这种方式也会比较均匀。
性能
生产速度
12c32G1000M--3台
单个生产者线程,单副本--821557个记录/秒(78.3 MB /秒)
单个生产者线程,三个副本,异步方式--786980 record / sec(75.1 MB / sec)
单个生产者线程,3个副本,同步复制----428823条记录/秒(40.2 MB /秒)
三个生产者,3个副本,异步复制----2024032个记录/秒(193.0 MB /秒)
问题
golang连接kafka(sarama)内存暴涨问题记录
问题背景
使用kafka客户端类库(sarama)步发布消息, qps为100+, 上线后内存,cpu爆炸。
排查过程
- 首先排查代码层面写的有逻辑bug, 比如连接未close, 排查无问题
- 排查发布的消息较大, 导致golang频繁gc, 和同事确认,无频繁gc
- 通过查看源码,发现每次http请求,操作kafka都是短链接, 即频繁的会新建短链接, 排查到这里,还是不能特别确认是因为短链接导致, 因为之前接入rabbitmq类库,也是使用的短链接。
- 使用pprof打印出火焰图, profile, 和block的, 也没发现特别大的bug点。
- 使用sarama meory搜索官方issue, 和谷歌查询。 得到出具体结论
分析具体问题
从官方issue 得知, sarama类库自动依赖第三方的统计类库go-mertic, 主要是为了方便给prometheus统计数据。 sarama类库默认打开。导致该统计站的内存,迟迟未释放
因此使用sarama前, 将该统计关闭即可。
对应代码:
metrics.UseNilMetrics = true