欢迎光临
我们一直在努力

golang异步kafka生产者

在实际业务场景中,为了提高系统的实时性,减轻日志存储压力,需要将日志直接生产至消息中间件,减少flume或flumted收集所导致的延时及性能压力,本文实现了一下功能:
实现了一个静态调用的异步生产者    AsyncProducer
封装了一个用于异步发送的生产器    Agent

//@description	kafka代理
//@author chenbintao
//@data	2017-09-27	10:30	初稿
//		2017-09-27	11:15	规范代码
//		2017-09-28	14:15	对发送逻辑进行了优化

package kafkaAgent

import (
    "fmt"
    "log"
    "runtime/debug"
    "strings"
    "time"

    "github.com/Shopify/sarama"
)

const (
    _BROKER_LIST_ = `localhost:9092`
)
const (
    _LABEL_ = "[_kafkaAgent_]"
)

var (
    IS_DEBUG = false
    _PAUSE_  = false
)

func SetDebug(debug bool) {
    IS_DEBUG = debug
}

type Agent struct {
    flag           bool
    BrokerList     string
    TopicList      string
    SendTimeOut    time.Duration
    ReceiveTimeOut time.Duration
    AsyncProducer  sarama.AsyncProducer
}

func (this *Agent) Set(BrokerList, TopicList string, SendTimeOut, ReceiveTimeOut time.Duration) bool {
    //只允许初始化一次
    if this.flag {

        return false
    }

    this.flag = true
    this.BrokerList = BrokerList
    this.TopicList = TopicList
    this.SendTimeOut = SendTimeOut
    this.ReceiveTimeOut = ReceiveTimeOut
    this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true)
    if nil == this.AsyncProducer {

        return false
    }

    return this.Check()
}
func (this *Agent) Check() bool {
    if "" == this.BrokerList || "" == this.TopicList {

        return false
    }
    if 0 == this.SendTimeOut && 0 == this.ReceiveTimeOut {

        return false
    }

    return true
}
func (this *Agent) Send(msg string) bool {
    defer func() {
        if e, ok := recover().(error); ok {
            log.Println(_LABEL_, "WARN: panic in %v", e)
            log.Println(_LABEL_, string(debug.Stack()))
            this.AsyncProducer.Close()
            this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true)
        }
    }()
    if !this.Check() {

        return false
    }

    return asyncProducer(
        this.AsyncProducer,
        this.TopicList,
        msg,
    )
}

//=========================================================================
// asyncProducer 异步生产者
func AsyncProducer(kafka_list, topics, s string, timeout time.Duration) bool {
    if "" == kafka_list || "" == topics {

        return false
    }
    producer := getProducer(kafka_list, timeout, false)
    if nil == producer {

        return false
    }
    defer producer.Close()
    go func(p sarama.AsyncProducer) {
        errors := p.Errors()
        success := p.Successes()
        for {
            select {
            case err := <-errors:
                if err != nil {
                    if IS_DEBUG {
                        log.Println(_LABEL_, err)
                    }
                    return
                } else {
                    return
                }
            case <-success:
                return
            }
        }
    }(producer)

    return asyncProducer(producer, topics, s)
}
func asyncProducer(p sarama.AsyncProducer, topics, s string) bool {
    if nil == p {

        return false
    }

    msg := &sarama.ProducerMessage{
        Topic: topics,
        Value: sarama.ByteEncoder(s),
    }
    p.Input() <- msg

    if IS_DEBUG {
        fmt.Println(_LABEL_, msg)
    }

    return true
}
func getProducer(kafka_list string, timeout time.Duration, monitor bool) sarama.AsyncProducer {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Timeout = timeout
    producer, err := sarama.NewAsyncProducer(strings.Split(kafka_list, ","), config)
    if err != nil {
        if IS_DEBUG {
            log.Println(_LABEL_, err)
        }
    }
    if monitor {
        //消费状态消息,防止死锁
        go func(producer sarama.AsyncProducer) {
            if nil == producer {
                log.Println(_LABEL_, "getProducer() producer error!")

                return
            }
            errors := producer.Errors()
            success := producer.Successes()
            for {
                select {
                case err := <-errors:
                    if err != nil {
                        if IS_DEBUG {
                            log.Println(_LABEL_, err)
                        }
                        continue
                    } else {
                        continue
                    }
                case <-success:
                    continue
                }
            }
        }(producer)
    }

    return producer
}

 

赞(1) 打赏
转载请注明来源:IT技术资讯 » golang异步kafka生产者

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏