Git Product home page Git Product logo

articles's People

Contributors

lzh2nix avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

articles's Issues

<<Linux性能优化实战>> 综合实战篇 阅读笔记

**史:从火到弗洛伊德

image

一直想读未开始读系列

大全书, 作者还有一本书是<从弗洛伊德到互联网>,一己之力把宏大的人类**史讲完。

人类第一次意识到人类的历史比传说早 绝对是一件了不起的事情,第一次考古发现人类的历史要早与<圣经>里提到的故事,这个问题我们闹补了好几天。当时能够打破权威,表达出独立的声音实在是太厉害了。

mqtt 简介以及paho client 的实现方式

mqtt spec介绍

mqtt 作一种轻量级的通信协议在iot平台中已经成为一种标配, 通过 pub/sub 的方式给两端提供了pub/sub 的功能. pub 主要用于消息的上报(log 信息, 设备运行数据, 对broker 下发指令的相应等); sub 则主要用于云端消息推送, 设备sub 一个指定的topic 当其他端向这个topic 投递消息的时候, broker 就要该消息转发到sub 端. 以下是一个简单的模型.
图片

*”MQTT is a Client Server publish/subscribe messaging transport protocol. It is light weight, open, simple, and designed so as to be easy to implement. These characteristics make it ideal for use in many situations, including constrained environments such as for communication in Machine to Machine (M2M) and Internet of Things (IoT) contexts where a small code footprint is required and/or network bandwidth is at a premium.“ *

  • *以上是spec中对协议的描述, 主要使用于 M2M 以及低功耗和低带宽的场景.从协议层面来看他主要由以下三部分组成:

图片

fixed header 出现在每个 mqtt 的消息中主要用来表示该消息是那种类似的消息, 他整个的大小也就只有2 bytes.
图片

对于特定的 packet 还会有 vairable header 和 paylaod 两部分. variable主要用来表示一些包的属性,比如对于connect 包 vairable header 中会说明 username passowd 是否为空等等, 然后在 payload 中带上具体的 username password.
vairable header 在各种 packet 中的存在情况.
图片
payload 在各种header 中的存在情况
图片

可以看到 对 ping 包来说每次发的包也就只有2 bytes(只有fixed header). 对于 mqtt的深入学习可以参考:

  1. 官方spec 文档 http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
  2. hivemq 的 essentials 文章 https://www.hivemq.com/mqtt-essentials/

在这文篇文章里我们首先会看下官方实现https://github.com/eclipse/paho.mqtt.golang中 golang 的进程模型, 然后细扣一下对协议的实现以及一些 golang 技巧.

进程模型

我们就从 client.go Connect 函数看起, 在 Connect 中起来另外一个 go routine 来处理一系列的操作,对外直接 返回了token, 这样Connect 也就实现了纯异步操作.

func (c *client) Connect() Token {
	var err error
	t := newToken(packets.Connect).(*ConnectToken)
	DEBUG.Println(CLI, "Connect()")

	c.obound = make(chan *PacketAndToken, c.options.MessageChannelDepth)
	c.oboundP = make(chan *PacketAndToken, c.options.MessageChannelDepth)
	c.ibound = make(chan packets.ControlPacket)

	go func() {
         ...
    }
    return t
}

之后我们来看下里面go routine的具体内容,
图片

在上面我们看到一个client会创建6个go routine, 各个go routine 直接的交互主要是通过client中的以下几个chan完成:

type client struct {
  	ibound          chan packets.ControlPacket
	obound          chan *PacketAndToken
	oboundP         chan *PacketAndToken
    incomingPubChan chan *packets.PublishPacket
	errors          chan error
	stop            chan struct{}
}

下面是各routine 之间的交互图:
图片

一些golang 技巧

优秀的开源项目会让你代码的功力大增, 里面有很多设计和代码上的技巧.

针对消息持久化的抽象

client 的 persist 作为一种Store 类似的interface

type client struct {
	persist         Store
}

Store 的具体实现:

// Store is an interface which can be used to provide implementations
// for message persistence.
// Because we may have to store distinct messages with the same
// message ID, we need a unique key for each message. This is
// possible by prepending "i." or "o." to each message id
type Store interface {
	Open()
	Put(key string, message packets.ControlPacket)
	Get(key string) packets.ControlPacket
	All() []string
	Del(key string)
	Close()
	Reset()

}

然后在 filestore.go 和 memstore.go中个子实现这些接口 就完成了很好的抽象.

PacketAndToken 在个chnnel 之间的穿梭

packet token的主要目的是等待 control packet 对应的ack 消息, 这里抽象出一个baseToken 然后其他token 在这个基础上扩展.

type baseToken struct {
	m        sync.RWMutex
	complete chan struct{}
	err      error
}
type ConnectToken struct {
	baseToken
	returnCode     byte
	sessionPresent bool
}
type ConnectToken struct {
	baseToken
	returnCode     byte
	sessionPresent bool
}
type PublishToken struct {
	baseToken
	messageID uint16
}

然后在发送一条control packet 完了之后我们  WaitTimeoutWait 在这个token上等待ack消息,
下面就看下pub的流程

func (c *client) Publish(topic string, qos byte, retained bool, payload interface{}) Token {
	token := newToken(packets.Publish).(*PublishToken)
	pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
     select {
		case c.obound <- &PacketAndToken{p: pub, t: token}: 
		case <-time.After(publishWaitTimeout):
			token.setError(errors.New("publish was broken by timeout")) // 正常是上面case, 只有在 oboud满了的时候才会触发整个time
		}
    	return token // 如果应用层wait的话就会block在哪里
}

// 发送消息的处理
func outgoing(c *client) {
  select {
		case pub := <-c.obound:
			msg := pub.p.(*packets.PublishPacket)
            msg.Write(c.conn) // 往socket上写消息
			if msg.Qos == 0 {
				pub.t.flowComplete() // Qos 消息不需要check puback 直接complete 即可
			}
}
// 接收到 pub ack的处理
func alllogic(c *client) {
  select {
    case *packets.PubackPacket:
			DEBUG.Println(NET, "received puback, id:", m.MessageID)
			c.getToken(m.MessageID).flowComplete() // qos1 消息收到 puback
  }  
}

ackFunc 的使用

开源软件里对语言的使用真的是让你眼花缭乱, 在Qos1 和Qos 2 中由于客户端收到一个Publish之后需要 response 一个PubrecPacket或者Puback, 在 net.go中就抽象出了一个 ackFunc

func (c *client) ackFunc(packet *packets.PublishPacket) func() {
	return func() {
		switch packet.Qos {
		case 2:
			pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
			pr.MessageID = packet.MessageID
			DEBUG.Println(NET, "putting pubrec msg on obound")
			select {
			case c.oboundP <- &PacketAndToken{p: pr, t: nil}:
			case <-c.stop:
			}
			DEBUG.Println(NET, "done putting pubrec msg on obound")
		case 1:
			pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
			pa.MessageID = packet.MessageID
			DEBUG.Println(NET, "putting puback msg on obound")
			persistOutbound(c.persist, pa)
			select {
			case c.oboundP <- &PacketAndToken{p: pa, t: nil}:
			case <-c.stop:
			}
			DEBUG.Println(NET, "done putting puback msg on obound")
		case 0:
			// do nothing, since there is no need to send an ack packet back
		}
	}

}

然后在message dispatch函数中通知到应用层的handler之后就直接ack掉了.

go func() {
  hd(client, m) // 调用应用层的handler函数
  m.Ack() // 发送对应的ack
}()

总结

就是不断的抽象, 公用的代码一定要抽象出来.

emqtt中的keepalive实现

针对keepalive我们先来看下协议3.1.4 中针对keepalive的描述:
"3.14 Response
...
4. Start message delivery and keep alive monitoring."

也就是说在connect成功之后开始keepalive timer.下面看下代码

%% deps/emqttd/src/emqttd_protocol.erl

process(?CONNECT_PACKET(Var), State0) ->
    case validate_connect(Var, State1) of
        ?CONNACK_ACCEPT ->
            case authenticate(client(State1), Password) of
                {ok, IsSuperuser} ->
                    %% Generate clientId if null
                    State2 = maybe_set_clientid(State1),

                    %% Start session
                    case emqttd_sm:start_session(CleanSess, {clientid(State2), Username}) of
                        {ok, Session, SP} ->
                            %% Register the client
                            emqttd_cm:reg(client(State2)),
                            %% Start keepalive
                            start_keepalive(KeepAlive, State2) %% 开始keepalive的timer
%% deps/emqttd/src/emqttd_protocol.erl
start_keepalive(0, _State) -> ignore;

start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 ->
    self() ! {keepalive, start, round(Sec * Backoff)}.

再后面就是client process里对上年{keepalive, start}的处理了

handle_info({keepalive, start, Interval}, State = #client_state{connection = Conn}) ->
    ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
    StatFun = fun() ->
                case Conn:getstat([recv_oct]) of
                    {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
                    {error, Error}              -> {error, Error}
                end
             end,
    case emqttd_keepalive:start(StatFun, Interval, {keepalive, check}) of
        {ok, KeepAlive} ->
            {noreply, State#client_state{keepalive = KeepAlive}, hibernate};
        {error, Error} ->
            ?LOG(warning, "Keepalive error - ~p", [Error], State),
            shutdown(Error, State)
    end;

handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
    case emqttd_keepalive:check(KeepAlive) of
        {ok, KeepAlive1} ->
            {noreply, State#client_state{keepalive = KeepAlive1}, hibernate};
        {error, timeout} ->
            ?LOG(debug, "Keepalive timeout", [], State),
            shutdown(keepalive_timeout, State);
        {error, Error} ->
            ?LOG(warning, "Keepalive error - ~p", [Error], State),
            shutdown(Error, State)
    end;

针对协议中的
“It is the responsibility of the Client to ensure that the interval between Control Packets being sent does not exceed the Keep Alive value. In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet ”
他是通过Conn:getstat([recv_oct]) 来获取底层socket收到的数据包的变化,如果recv_oct有发生变化则表示有其他的control packet包接受。如果两次check失败则认为client掉线,进行掉线处理。

EMQ源码之--EMQ的启动

又回到erlang了,使用了一段时间的golang再回到erlang有点那么的亲切感。在项目中也准备用mqtt来做消息上报,顺道就想看下他的代码。

erlang中application都是通过supervisor来管理的,在emq中emqttd_sup是一个最大的supervisor,他下面面又连接了很多的supervisor或者worker。

ekka:start()
emqttd_sup
|-------->emqttd_ctl 负责从emqttd_ctl命令过来的rpc handler
|-------->emqttd_hooks(hook 函数的处理)
|-------->emqttd_router(各node之间的消息路由)
|-------->emqttd_pubsub_sup(管理pubsub相关的supervisor)
|-------->emqttd_pool_sup(emqttd_pubsub的supervisor)gproc_pool
               |----->emqttd_pubsub_1(worker)
               |----->emqttd_pubsub_2(worker)
|-------->emqttd_pool_sup(emqttd_server的supervisor)gproc_pool
               |----->emqttd_server_1(worker)
               |----->emqttd_server_2(worker)
|--------->emqttd_stats(stats topic相关的统计)
|--------->emqttd_stats(metrics topic相关的统计)
|--------->emqttd_pool_sup(pooler没看到哪里用到了这快)gproc_pool
             |------->pooler_1(worker)
             |------->pooler_2(worker)
|--------->emqttd_sm_sup( session management supervisor)gproc_pool
             |------->emqttd_sm_1(worker)
             |------->emqttd_sm_2(worker)
|--------->emqttd_ws_client_sup(websocket client supervisor)gproc_pool
             |------->emqttd_ws_client_1(worker)
             |------->emqttd_ws_client_2(worker)
|--------->emqttd_broker(broker统计相关handler)
|--------->emqttd_alarm(系统alerm相关的handler)
|--------->emqttd_mod_sup(管理外部mod的supervisor)
|--------->emqttd_bridge_sup_sup(bridge supervisor)
|--------->emqttd_access_control(auth/acl相关管理模块)
|--------->emqttd_sysmon_sup(system monitor supervisor)
             |-------->emqttd_sysmon(vm system monitor)
register_acl_mod()
start_listener()

之后就开始socket监听了,等待新的连接到来。
erlang的优势在于他又一套完善的process 监控系统。具体可以参考这里, 子进程退出后supervisor会给你自动重启。

一周一命令之ps

ps - report a snapshot of the current processes.

其实和pidstat功能有部分重叠, 不过ps命令更灵活,各种统计项都可以直接定义,所以个这个命令在线上使用也比较多。

  • ps aux | grep xxx 查看进程有没有起来,以及根据cmd查询pid
  • ps -eo pid,user,args --sort user 用户指定统计项并有--sort 进行排序,详细可定制的统计项和可以才与排序的key可以查看man
  • 配合watch -n 1 进行统计实时监控 watch -n 1 'ps -eo pid,ppid,cmd,%mem,%cpu --sort=-%mem | head'
    image
  • ps -ef --forest | grep -v grep | grep xxx 展示进程树
    image
  • ps -fp xx,xxx 只统计指定进程的信息

在Go中友好的打印http的request

开始我们使用的方式可能就是fmt.Println(req)了, 但是这种方式的打印看上去很不友好.你看到的request可能是这样的.

&{GET /hello/lizhengqian HTTP/1.1 1 1 map[User-Agent:[PostmanRuntime/7.1.1] Accept:[*/*] Accept-Encoding:[gzip, deflate] Connection:[keep-alive] Cache-Control:[no-cache] Postman-Token:[8ad0dcda-a703-4a09-a54f-1e62a156e35c]] {} <nil> 0 [] false 0.0.0.0:8082 map[] map[] <nil> map[] 127.0.0.1:51577 /hello/lizhengqian <nil> <nil> <nil> 0xc4201ba640}

这里如果你对request的每个字段不太熟悉的话这里打印出来的东西就只能大概的去猜了.

通过http dump 可以很完美的将头里的所有信息给打印出来了.

requestDump, err := httputil.DumpRequest(req, true)
if err != nil {
fmt.Println(err)
}
fmt.Println(string(requestDump))

完美的输出

GET /hello/lizhengqian HTTP/1.1
Host: 0.0.0.0:8082
Accept: */*
Accept-Encoding: gzip, deflate
Cache-Control: no-cache
Connection: keep-alive
Postman-Token: 548c6c27-2f6c-491d-9fc3-31417d8345dc
User-Agent: PostmanRuntime/7.1.1

贫穷的本质

作为2019年诺贝尔经济学奖获得者书,并不像大多数的畅销书,书中使用了大量的使用了统计数据和实地实验考察数据。本书中主要关心的是他们是生活在每天0.99美元的贫困线之下的10亿人民, 由于一个作者是一位印度人,所以书中有很多描述印度的情况,相对来说描述**的部分就少很多(也有可能是**最近在这方面取得了很大的突破)。

**在城镇化上上作出了很大的一步,尤其是异地搬迁项目,确实是很多人搬里了农村,就目前的户籍制的限制来看,怎么样这些新城镇人重新就业并在城市享受到一样的待遇应该是一个社会很大的难点。

NSQ源码-nsqlookupd

为什么选择nsq

之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点rabbitmq的代码,以及后来的emqtt都看过一点, 所以对消息队列这块是情有独钟。转到go后也在关注消息队列这块,nsq是一个golng的消息系统, 而且架构也非常的简单。所以想通过源码的学习来掌握一些语言技巧。

nsq的架构与代码结构

nsq的的话主要有三个模块构成, 这里直接复制官方的介绍:

nsqd: is the daemon that receives, queues, and delivers messages to clients.

nsqlookupd: is the daemon that manages topology information and provides an eventually consistent discovery service.

nsqadmin: is a web UI to introspect the cluster in realtime (and perform various administrative tasks).

1

这里是一个消息投递的过程, 显示了消息怎么从nsqd到达consumer, 缺少了producer和nsqlookupd. nsqlookupd主要提供了两个功能:

  • 向nsqd提供一个topic和channel的注册信息
  • 对consumser提供了toic和channel的查询功能然后

consumer查询到nsqd之后就是上面看到的动态图了, consumer直接和nsqd通信, 下面是一个更全面一点的时序图
image

整个项目的代码结构也是围绕上面的三个模块构建:

  • internal(公共部分的实现)
  • nsqadmin(对nsqadmin的时间)
  • nsqd(对nsqd的实现)
  • nsqlookupd(对nsqlookupd的实现)

总共也就这四个package,是不是有很想看下去的冲动(smile).

lookupd的启动流程

经过上面的介绍,我们对lookupd有里简单的认识.首先他是一个独立的进程, 为topic和channel的发现服务. 但不参与时间的消息投递. 对lookup的实现是在nsq/apps/nsqlookupd/nsqlookupd.go和nsq/nsqlookupd/中. lookupd的启动是使用了一个叫go-srv的windows wrapper.通过在nsq/apps/nsqlookupd/nsqlookupd.go中实现:

type Service interface {
	// Init is called before the program/service is started and after it's
	// determined if the program is running as a Windows Service.
	Init(Environment) error

	// Start is called after Init. This method must be non-blocking.
	Start() error

	// Stop is called in response to os.Interrupt, os.Kill, or when a
	// Windows Service is stopped.
	Stop() error
}

来完成整个进程的管理,go-srv帮助我们做了系统信号的管理, 下面来看下lookupd的启动流程,

实例化一个NSQLookupd对象

// apps/nsqlookupd/nsqlookupd.go
	daemon := nsqlookupd.New(opts)  // 实例化一个NSQLookupd的对象
	err := daemon.Main()            // 开始启动NSQLookupd
	
// nsq/nsqlookupd/nsqlookupd.go
func New(opts *Options) *NSQLookupd {
     ....
	n := &NSQLookupd{
		opts: opts,    // 启动参数
		DB:   NewRegistrationDB(), // 内从里面的一个数据库,主要用来存储tpoic/channel以及nsqd的消息
	}
	...
	return n
}

开始启动

// Main starts an instance of nsqlookupd and returns an
// error if there was a problem starting up.
func (l *NSQLookupd) Main() error {
	ctx := &Context{l}

	// 启动两场go routine来处理tcp/http的请求
	tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
	if err != nil {
		return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err)
	}
	httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
	if err != nil {
		return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err)
	}

	l.tcpListener = tcpListener
	l.httpListener = httpListener

	tcpServer := &tcpServer{ctx: ctx}
	l.waitGroup.Wrap(func() {
		protocol.TCPServer(tcpListener, tcpServer, l.logf)
	})
	httpServer := newHTTPServer(ctx)
	l.waitGroup.Wrap(func() {
		http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
	})

	return nil
}

下面是一个lookupd里面的进程模型
image

lookupd里的主要数据结构

在上面创建一个instance的时候我们看到创建一个NewRegistrationDB()的函数, 这里就是存储lookupd所有数据结构的地方.
image

每个topic/channe/clientl就是一个Registration的key, 然后value对应的就是该topic/channel对应的nsqd信息.所有的接口都是在操作上面的那个数据结构.

lookupd和其他模块的交互

在进程模型中我们看到一个tcp server和一个http seerver, 和其他模块之间的交互都是在里面完成的.看下tcp server的处理

有新的tcp连接进来,创建一个新的go routine去服务该请求

// /nsq/internal/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
	for {
		...
		go handler.Handle(clientConn)
	}

实例化一个protocol对象

// /nsq/nsqlookupd/tcp.go
func (p *tcpServer) Handle(clientConn net.Conn) {
	...
	prot.IOLoop(clientConn)
	...
}

对请求的具体处理

// /nsq/nsqlookupd/lookup_protocol_v1.go
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
	...
	p.Exec(client, reader, params)
	...
}

// /nsq/nsqlookupd/lookup_protocol_v1.go
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	switch params[0] {
	case "PING": // NSQD的心跳包
		return p.PING(client, params)
	case "IDENTIFY": // NQSD启动时候的indentify就是我们上面看到的peerInfo
		return p.IDENTIFY(client, reader, params[1:])
	case "REGISTER": // 注册topic/channel信息到lookupd
		return p.REGISTER(client, reader, params[1:])
	case "UNREGISTER": // unregister topic/lookup 信息
		return p.UNREGISTER(client, reader, params[1:])
	}
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

上面就是整个tcp server的流程, 每个连接都是一个go routine. 相对tcp server来说的话http server就简单很多, 如果你对httprouter熟悉的话就更简单了就是对RegistrationDB的增删查改. http测的api的话可以参考:
官方的文档

总结

lookupd是其中比较简单的模块,通过源码的学习我们可以更好的掌握go的一些技巧,也鼓励大家通过一一些开源的代码来掌握语言的一些技巧。其实通过lookupd我们可以抽象一套自己的HTTP/TCP服务端架构来。

NSQ源码-Nsq客户端

看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的
认识。对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责从lookupd中获取到
指定nsqd之后,从nsqd中获取消息。

生产者

我们以nsq/apps/to_nsq/to_nsq.go为例,客户端这边的代码逻辑就简单很多,NewProducer实例化一个instance,publish消息
到nsqd。

/// nsq/apps/to_nsq/to_nsq.go
producer, err := nsq.NewProducer(addr, cfg)
err := producer.Publish(*topic, line)

下面来看下Publish里的具体逻辑。

// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {
	// 生成具体的cmd
	return w.sendCommand(Publish(topic, body))
}
func (w *Producer) sendCommand(cmd *Command) error {
	doneChan := make(chan *ProducerTransaction)
	err := w.sendCommandAsync(cmd, doneChan, nil)
	if err != nil {
		close(doneChan)
		return err
	}
	t := <-doneChan
	return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
	args []interface{}) error {
	// keep track of how many outstanding producers we're dealing with
	// in order to later ensure that we clean them all up...
	atomic.AddInt32(&w.concurrentProducers, 1)
	defer atomic.AddInt32(&w.concurrentProducers, -1)

	if atomic.LoadInt32(&w.state) != StateConnected {
		// 这里是一个lazily connect
		err := w.connect()
		if err != nil {
			return err
		}
	}

	t := &ProducerTransaction{
		cmd:      cmd,
		doneChan: doneChan,
		Args:     args,
	}

	select {
	case w.transactionChan <- t:
	case <-w.exitChan:
		return ErrStopped
	}

	return nil
}

在connect函数里启动了一个go routine去处理transactionChan对应的东西

func (w *Producer) connect() error {
	w.closeChan = make(chan int)
	w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
	w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
	_, err := w.conn.Connect()
	w.wg.Add(1)
	go w.router()

这里需要注意一下, go-nsq/conn.go是对底层连接的一个抽象,他是不关心你是生产者还是消费者,这里使用到了
delegate 模式,conn.go收到消息的处理放到了producerConnDelegate和consumerConnDelegate中,然后通知到具体的
消费者活着生产者。

消费者

回过头我们再来看下消费者部分的代码,client端我们以nsq/apps/nsq_tail/nsq_tail.go为例,代码的基本逻辑如下:

// 1. new comsunmer instanace 
consumer, err := nsq.NewConsumer(topics[i], *channel, cfg)
// 2. add handler
consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages})
// 3. connect to nsqd
consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
	log.Fatal(err)
}
// 4. connect to lookupd
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
	log.Fatal(err)
}
consumers = append(consumers, consumer)

下面来看下每个部分的实际代码:

func (r *Consumer) AddHandler(handler Handler) {
	r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
	if atomic.LoadInt32(&r.connectedFlag) == 1 {
		panic("already connected")
	}

	atomic.AddInt32(&r.runningHandlers, int32(concurrency))
	for i := 0; i < concurrency; i++ {
		go r.handlerLoop(handler)
	}
}

至此handler添加完成,起一个单独的go routine来等待消息的到了。

func (r *Consumer) handlerLoop(handler Handler) {
	r.log(LogLevelDebug, "starting Handler")

	for {
		message, ok := <-r.incomingMessages // 有新的消息的到来
		if !ok {
			goto exit
		}

		if r.shouldFailMessage(message, handler) {
			message.Finish()
			continue
		}

		err := handler.HandleMessage(message) // 调用之前注册的handler
		if err != nil {
			r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
			if !message.IsAutoResponseDisabled() {
				message.Requeue(-1)
			}
			continue
		}

		if !message.IsAutoResponseDisabled() {
			message.Finish()
		}
	}

exit:
	r.log(LogLevelDebug, "stopping Handler")
	if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
		r.exit()
	}
}

官方是不推荐只部署nqd而不部署lookupd的,我们直接看下lookup的连接过程:

func (r *Consumer) ConnectToNSQLookupd(addr string) error {
	...
	r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
	numLookupd := len(r.lookupdHTTPAddrs)
	r.mtx.Unlock()

	// if this is the first one, kick off the go loop
	if numLookupd == 1 {
		r.queryLookupd()
		r.wg.Add(1)
		go r.lookupdLoop()
	}
	return nil
}

在queryLookupd中先去查询lookupd获取最新的nqd地址,然后connect to nsqd.

func (r *Consumer) lookupdLoop() {
	// add some jitter so that multiple consumers discovering the same topic,
	// when restarted at the same time, dont all connect at once.
	ticker = time.NewTicker(r.config.LookupdPollInterval)
	// 每个ticker interval更新nqd的地址信息
	for {
		select {
		case <-ticker.C:
			r.queryLookupd()
		case <-r.lookupdRecheckChan:
			r.queryLookupd()
		case <-r.exitChan:
			goto exit
		}
	}
}
func (r *Consumer) ConnectToNSQD(addr string) error {
	// 1. new connection
	conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
	conn.SetLogger(logger, logLvl,
	fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))

	// 2. connection list
	_, pendingOk := r.pendingConnections[addr]
	_, ok := r.connections[addr]

	r.pendingConnections[addr] = conn
	if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
		r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
	}

	r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)
	// 3. new connect
	//   3.1 go c.readLoop()
	//   3.2 go c.writeLoop()
	resp, err := conn.Connect()
	
	// 4. sub to nsqd
	cmd := Subscribe(r.topic, r.channel)
	err = conn.WriteCommand(cmd)
}

以上就是客户端初始化的一个流程,然后就是接受消息处理了。

->NewConsumer() // 新建一个consumer
->ConnectToNSQLookupds() // 连接到lookupd
  |->ConnectToNSQLookupd() // 连接到lookupd
     |->r.queryLookupd() // 查询lookupd的
     	|->apiRequestNegotiateV1() // 调用lookupd的rest api获取nsqd消息
     	|->ConnectToNSQD() // 连接到具体nsq
     	   |->NewConn() // 连接instance
     	   |->conn.Connect() // 开始连接
     	   	  |->c.readLoop() // 与nqd连接read loop
     	   	  |->c.writeLoop() // 与nqd连接write loop
     	   |->Subscribe() // consumer发送SUB command
     |->lookupdLoop() // 定时查询lookupd并更新nsqd信息

注:

[1]. 关于delegate模式参考 这里

Docker入门与实践

Docker三大概念:

镜像(Image):

镜像就是一个文件, 你从repository pull下来的就是一个image, 后期可以运行起来.

容器(container):

镜像运行起来之后的一个隔离层, 各个应用可以运行的不同的容器里以起到隔离的作用.

仓库(Repository):

下载镜像的地方

docker images 列出本地的images
docker inspect id 查看image的详细信息
docker search name 搜索镜像
docker rmi image 删除镜像
docker run -ti ubuntu:16.04 运行一个image
docker commit -m "xxx" 修改之后提交
docker save -o xxxx.tar xxxx:xxxxx 导出镜像
docker load --input xxxx.tar 载入镜像
docker tag xxxx:xxx xxxxx:xxxxx 重新打标签
docker push xxxxx:xxxxx push到共有云上, 默认是dockerhub

docker run -xxx 启动一个docker容器
docker stop xxx 停止一个容器
docker ps -a -q 查看以停止的容器

docker run -d 以守护进程的方式启动
docker attach NAMES attach到一个容器上
docker rm 删除容器

docker export xxx > xxx.tar 导出
cat xx.tar > docker import - xxx/xxxx:v111 导入

docker -v /xxx/xxxx:/xxx/xxx 数据卷的共享, 将本地的xxxx/xxx 挂到容器里的xxx/xxx 这样可以实现文件同步.

docker -p 5060:5060/udp 将本地的5060端口映射到容器里的5060上, /udp 指定udp端口
docker -p 127.0.0.1::5060 随机分配一个本地端口给容器的5060
docker xxxx --name xxx 指定容器名
docker run --link xxx:xxx 将容器xxx链接到xxx上面, 这样实现容器互通

一周一package之http server

想对于http client, 作为后端开发基本上每天都在和http server端打交道,时刻都想着怎么提高服务端的处理能力, 虽然一般项目中也不会直接使用裸的net http 包,基本上都是使用框架提供的那一套东西来处理业务逻辑。因为框架最终还是会使用底层的http 包,好好过一遍http server端的代码对你更好的理解框架有很好的帮助。接下来我们就进入net http的探索之路。

这里代码分析是以 net/http/server.go 为主。

下面还是以一个最简单的例子来开始我们的学习之路。

package main
import (
	"fmt"
	"log"
	"net/http"
)
func main() {
	http.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "hello world") })
	log.Fatal(http.ListenAndServe(":8080", nil))
}

以上是一个很简单http server,http.HandleFunc 中注册了 /foo 路由,然后下面监听了8080 端口。看到这段代码之后,会有以下的一些疑问:

  1. foo 路由是怎么注册的,具体注册到哪里的?
  2. 有请求来之后路由是怎么处理的,foo, foo/ 有这样的两个路路由的话是怎么处理的?
  3. 端口监听新请求到来之后是怎么处理的?
  4. http 升级websocket是怎么处理的?
  5. http协议的处理流程是什么样的?

现在就让我们带着这些问题前行吧。

路由注册

在上面的代码中http.HandleFunc 会将 /foo 这个路由合注册到一个Default 的ServeMux上,这里就有一个新的概念出来了, 啥是ServeMux? 这里就可以理解为http的路由表,当一个请求到来之后ServeMux 会根据reqest中 URL的pattern和注册的路由表进行匹配。一个基本的匹配原则就是req中的url和最佳(前缀匹配最长)的pattern去匹配,如果没有路由匹配的话就直接和带“/“的上一级path进行匹配,这里会一直递推到根路由 ”/“ 如果跟路由没有注册的话就返回404。 假设注册了/foo, /foo/bar /foo/ 三个路由,会发生以下的路由:

/foo -------------->/foo

/foo/bar--------->/foo/bar

/foo/xxxx------->/foo/

/bar ------------->404

下面是一个ServeMux的定义:

type ServeMux struct {
		mu    sync.RWMutex
		m     map[string]muxEntry // 路由表
		es    []muxEntry // slice of entries sorted from longest to shortest.
		hosts bool       // whether any patterns contain hostnames
	}
type muxEntry struct {
		h       Handler
		pattern string
	}

在我们调用http.HandleFunc的是时候, 会使用 DefaultServeMux 将其添加到ServeMux.m 中。

下面是在路由表中找到一个合适的位置并插入分代码

func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
	n := len(es)
	i := sort.Search(n, func(i int) bool {
		return len(es[i].pattern) < len(e.pattern)
	})
	if i == n {
		return append(es, e)
	}
	// we now know that i points at where we want to insert
	es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
	copy(es[i+1:], es[i:])      // Move shorter entries down
	es[i] = e
	return es
}

代码有很好的copy属性,可以添加到自己的util库里,以后可以使用。

至此路由注册以及ok, 接下来看下一个请求到来之后的处理过程。

http server 监听

http 是基于tcp的,所以这里套路都一样:

  1. 开始监听一个端口
  2. 然后等等客户端的链接
  3. 每个连接单独起一个goroutine 去处理

实际代码中注意一下,这里将listener 放到了track列表里, 后期在Close和Shutdown的时候能够优雅的关闭。具体可以这个函数里的代码:

func (srv *Server) Serve(l net.Listener) error

新请求处理

针对新到来的connection 底层已经创建了一个go routine来单独处理这个请求,注意context里会把sever给带上了。在新请求的处理i过程中会分配两个buffer:

c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
func newBufioReader(r io.Reader) *bufio.Reader {
	if v := bufioReaderPool.Get(); v != nil {
		br := v.(*bufio.Reader)
		br.Reset(r)
		return br
	}
	// Note: if this reader size is ever changed, update
	// TestHandlerBodyClose's assumptions.
	return bufio.NewReader(r)
}

这种Buffer Pool的方式广泛使用在了协议解析/封装层面中(nsq/emitter/nats 都有使用), 使用一个pool来管理临时性的对象,这样会大大的减少内存申请/释放的次数。

上面已经申请好buffer了,接下来就是从底层的socket上读取http request了:

/ Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error)

大概逻辑是这样:

  1. 设置读取reader的timeout时间
  2. 设置读取整个body的超时时间
  3. 在readRequest 方法中check http的request line然后读取所有的Header 并check header的有效性
  4. 在readerTransfer 中 check 是chunk传输或者是普通的text传输,两种方式的read 方式会有所不同,在chunk方式传输中client是按照chunk发送的,但是在服务端read的时候会将chunk合并好之后再给到用户层。具体的chunk的解析方式的话可以参考 net/http/internal/chunked.go

至此http request的解析已经完成了,也到了最重要的一个环节,路由请求到之前注册的handler

// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
serverHandler{c.server}.ServeHTTP(w, w.req)

这里最终会委托给 ServeMuxServerHTTP方法, 在ServeMux中会从之前注册的方法中找到最合适的pattern 进行匹配。

// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
	if r.RequestURI == "*" {
		if r.ProtoAtLeast(1, 1) {
			w.Header().Set("Connection", "close")
		}
		w.WriteHeader(StatusBadRequest)
		return
	}
	h, _ := mux.Handler(r)
	h.ServeHTTP(w, r)
}

可以好好研究一下ServeHTTP中 ResponseWriter interface的使用,这里很好的将底层的response和用户层给隔离开了。

然后将http.ResponseWriter 和** http.Request** 传递给用户之前注册的handler

http.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "hello world")})

这样HTTP层就把数据送到用户层了,然后就是用户层写response给ResponseWriter,函数从用户层返回之后就会继续执行

func (c *conn) readRequest(ctx context.Context) (w *response, err error)

后续方法,其中包括:

  1. 进行flush操作将所有数据写到底层socket上,然后将之前申请的buffer归还回去,这样一个http请求就完成了。
  2. 之后就看这条链接是否需要保留,如果需要保留的话Connection就进入Idle 状态,这样下一个请求来之后可以继续使用, 否则就释放链接。

server 优雅的关闭

随着各大公司开发微服务化之后,基本每周都会有一个版本的发布,这里就是设计到一个问题就是怎么优雅的升级,保证当前正在进行的请求不受影响。为了解决这个问题 Go http 在 1.8 中提供了优雅关闭策率。

逻辑如下:

  1. 关闭所有的Listener(之前将listener track起来就是这个目的),这样的话就不会有新的客户端连接请求进来
  2. 关闭所有的空闲链接, 保证不会有新球http请求到来
  3. 等当前active的链接处理完毕之后直接诶关闭

这里针对hijacked的连接, http 层面不会做特殊处理,如果需要收到相关的通知的话可以通过注册

func (srv *Server) RegisterOnShutdown(f func())

来收到 shutdown的通知。

从中学习到了什么

  1. 里面有golang的各种实用技巧,有些代码可以放到自己的代码库中,下次看到类似的模型之后可以直接拿出来使用
  2. 提供对代码的理解能力,这里也能帮助你个更好的使用http package里的代码,有些东西当你知道他大概是怎么实现的时候再次使用的时候心里就有底多了。

一周一命令之vmstat

vmstat ----- Report virtual memory statistics

又是一个整体系统分析的好帮手,这里看名字感觉是和虚拟内存相关的命令,其实vmstat可以统计很多信息, 这里命令也是有很多的模式

  • vm mode 展示系统整体情况
    image
    proc 的 r(running+waiting for run)+b(uninterruptible sleep) 就是top中系统负载的来源
    memory 统计里内存的使用情况
    swap swap In/out的统计
    IO 块设备的读写情况(是不是可以理解为磁盘的读写情况??)
    system 这个信息也是很关键, in 表示单位时间内的中断数, cs是上下文切换的次数
    cpu 这个就不用介绍了,cpu各个状态的站比
  • disk mode展示磁盘相关的情况
    image
    有个缺点就是这里展示的是系统启动以来的累计值而不是单位时间内的统计,这块可以通过watch -n 1 vmstat -d来看每一行的变化频率。
    image
    还有一种方式就是直接使用sar -d 1来查看。

比较常用的也就这两个,-f 可以查看从开机到现在创建了多少个进程数, watch -n 1 vmstat -f 可以观察变化率
-m 查看slab相关的统计情况

<<Linux性能优化实战>> 内存篇 阅读笔记

nsqd中Message Delivery Guarantees的实现

在消息队列中有个很关键指标就是Qos, 这里也就决定了这个消息队列服务对消息的态度中, 在mqtt中Qos有三个类型的。

Qos 0(最多一次)

qos 0 为最低一级别的,发送端只管发,不管接受着是否接受到了。
image

Qos 1(至少一次)

在qos中条添加了消息确认机制, 发送端着发送完成之后会等待接受端的Ack确认, 只有确认之后发送端才会从本地的copy总删除,如果一直没有收到ack的话就会触发重发机制。(这里需要注意的一点就是在该模式下消息可能会重复)
image

Qos 2(准确的一次)

Qos2 就相对严格很多,确保只有一条消息发送给接受端。
image

以上虽然只是mqtt中的定义, 消息队列的投递其实也就不外乎这几种, 然后各自有不同的协议,不同的实现而已, 下面就看下nsq中的实现。

nsq中实现

nsq使用的是qos1模式的消息投递,在协议中定义了一个FIN 的spec,来表示consumer接受到了消息,如果consumer一直没有发送FIN的话就会触发nsq中的重发逻辑。下面就看下nqd中的实现逻辑。

FINAnchor link for: fin
Finish a message (indicate successful processing)

FIN <message_id>\n

<message_id> - message id as 16-byte hex string
NOTE: there is no success response

首先在nsq启动的时候会启动一个单独的goroutine queueScanLoop, 这个goroutine又会启动4个worker线程, 这个goroutine定期去随机的检查channel中的数据,如果大于25%的channel有数据的话就继续让worker 去工作(发送inflight queue和defer queue里的消息)。
下面来看下具体的代码:

func (n *NSQD) Main() error {
	n.waitGroup.Wrap(n.queueScanLoop) // main 总启动queueScanLoop
}

func (n *NSQD) queueScanLoop() {
	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
	closeCh := make(chan int)

	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)  // 刷新 timer

	channels := n.channels()
        // 根据 channel数初始化worker pool
	n.resizePool(len(channels), workCh, responseCh, closeCh)

	for {
		select {
		case <-workTicker.C:
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C:
			channels = n.channels()
			n.resizePool(len(channels), workCh, responseCh, closeCh) // 重新计算worker的个数
			continue
		case <-n.exitChan:
			goto exit
		}

		num := n.getOpts().QueueScanSelectionCount
		if num > len(channels) {
			num = len(channels)
		}

	loop:
                // 随机选择4个channel让worker线程去v处理
		for _, i := range util.UniqRands(num, len(channels)) {
			workCh <- channels[i]
		}

		numDirty := 0
		for i := 0; i < num; i++ {
			if <-responseCh { // worker 线程返回的结果
				numDirty++
			}
		}
                 // 判断是否让worker继续去处理
		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
			goto loop
		}
	}

exit:
	n.logf(LOG_INFO, "QUEUESCAN: closing")
	close(closeCh)
	workTicker.Stop()
	refreshTicker.Stop()
}

之后channel在处理 inflight queue的时候首先取出消息, 然后通过 memoryMsgChan 通知到protol 中的msgPump,msgPump 将消息重新放入 inflight quene中并发送给comsumer.

func (c *Channel) processInFlightQueue(t int64) bool {
for {
		c.inFlightMutex.Lock()
		msg, _ := c.inFlightPQ.PeekAndShift(t)
		c.inFlightMutex.Unlock()

		if msg == nil {
			goto exit
		}
		dirty = true

		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
		if err != nil {
			goto exit
		}
		atomic.AddUint64(&c.timeoutCount, 1)
		c.RLock()
		client, ok := c.clients[msg.clientID]
		c.RUnlock()
		if ok {
			client.TimedOutMessage()
		}
		c.put(msg)
	}

}
func (c *Channel) put(m *Message) error {
	case c.memoryMsgChan <- m: // 写入channel(只会有一个consumer收到该消息)
}
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
		case msg := <-memoryMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
}

一周一package之http client

前言

net http 包相对来说是一个很大的包,大概5万行左右(包含fcgi,cgi部分代码),这里计划分两部分来写, http client 和http server。本片之关注http client的实现。

http client 相信大家在日常工作中是最常用的一个包,像我们内部服务之间的交互就是依靠rest http cl来完成,相对于grpc 来说的话更简单直接。但是怎么用好 http client 也是大有学问的,比如defaultClient 该不该用等等。本文将带你理解golang http client 发送一条请求的整个过程。

golang http client 有一下几个特点:

1. client 内部通过缓存 tcp 链接来达到底层链接复用的的效果
2. 合理的处理各种redirect
3. client需要保证线程安全
4. 提供了自定义 Transport用户可以通过实现RoundTripper的方式对http 进行处理

源码分析

以下是使用http client最简单的一种方式,package官方文档中的第一个例子

resp, err := http.Get("http://example.com/")

在Get函数中会继续调用DefaultClient 的Do 方法,之后会继续往下走,建立链接--->发送请求-->等待相应。我们分析源码也是跟着这里流程去走,看下从网络连接到发送http package再到获取响应的整个过程。看源代码一定要有一个主线流程,这样你才不会走丢,如果两眼一抹黑看代码的话,你就很容易迷失在各种函数里。

整体架构

image

上图是http client的一个简单架构client层主要负责

  1. http request的组装
  2. redirect的处理,
  3. 如果Client.Timout不为空的时候设置cancel context

然后transport 负责多个connection的管理(包括idle connection)。由于连接复用,线程安全都是在transport做的,所以这部分代码也是在整个处理过程中最难理解的部分。

进程模型

image

以上是处理的整体流程,在图中忽略了一些错误处理的流程,整体上以transport层为核心,在transport中又以 persistConn的管理为中心。接下里分析下之前提到的链接复用,线程安全是如何做到的。

连接复用

在transport 中通过 MaxIdleConnsPerHost和 MaxIdleConns 来控制cient和服务端之间的总连接数,defaultClient中是每个host保持两个链接,本地总共可以保持100个persistConnection。有新的链接之后通过过LRU来淘汰老的connection, 也可以通过控制IdeConnTimeout 来控制空链接的持续时间。上面图中有提到了,在创建新链接的时候同时会等待其他链接的**完成,**如果有空闲连接的通知,就直接使用空闲链接(正在创建的链接需要cancel)。

线程安全

每个req都是由一个独立的persistConn来处理,每个connection 都是独立的。所以这里是安全的

逃坑指南

  1. 返回给应用层的response,获取resp.Body之后没有调用resp.Body.Close()。

这里首先明确的一点是response.Body中返回的是一个io.ReadCloser的interface 并不是buffer数据,然后看下persistConn的readLoop 方法。

图片

这里给上层返回response之后会等等,上面把数据读完,之后会把本次使用的connection放到链接池里(或者释放),这也就是为啥一定要关闭的原因。如果没有关闭这里就会造成泄露(除非request本身设置了cancal)。

  1. 使用deaultClient,由于defaultClient没有timeout所以请求可能会一直block,直到底层readWrite timeout。

总结

一定要根据自己的需求配置一个很是的Client/Transport, 有其实idle connection的量,一个合适的连接池对性能的提升是有很大帮助的。

刷新:重新发现商业与未来

image

作者上任之后微软就走上了变革之路,变革也很顺利。
能在那么大一个组织里领导一种变革确实实属不宜,而且微软的变革是从文化层面的变革。

1.文化的变革
2. 团队共识的重要性
3. 和竞争对手之间建立很好的合作关系

这本书看起来也很快,刷了三天基本上就看完。

一周一命令之sar

sar 是第一个让我 "啊哈"的命令, 正的是太强大了。

  • -B 统计页面调度信息
    image
  • -b 统计IO读写情况
    image
  • -d 块设备的读写情况
    image
  • -H 大页的使用情况
    image
  • -n 网络相关的东西情况(个人觉得这个是超牛逼的功能,尤其是E开头的几个选项)
    image

sar -n [DEV, EDEV, FC, ICMP, EICMP, ICMP6, EICMP6, IP, EIP, IP6, EIP6, NFS, NFSD, SOCK, SOCK6, SOFT, TCP, ETCP, UDP and UDP6] 不抓包的情况下快速分析的好帮手

  • -q 动态化的查看系统负载情况
    image
  • -r 统计内存使用情况
    image
  • -u cpu使用情况
    image

另外一个不常用的选项 -v 查看inode是使用情况, -W swapout/swapoutin的状况, -w 上下文切换的情况

包含了CPU/内存/磁盘/网络/电源....

<<Linux性能优化实战>> CPU篇 阅读笔记

缘起

老早就买了极客时间这个专栏,一直没有很完整的过一遍所有的章节,更多的是有遇到问题了之后再去查下某一个章节去看下,中间的学习也是断断续续。偶然的机会在油管看到tallkGo 最新的一次reading项目在讨论这个课程,就想着参加一下。依然是那句话 “一个人能够的很快,但是一群人能走的更远”。学习亦是如此。学习必须要不断的交流,交流中才会有新的碰撞。
目录:

CPU

第二讲 平均负载(2020.5.26)
第三讲 上下文切换理论(2020.5.27)
第四讲 CPU上下文切换之实战(2020.5.28)
第五讲 CPU使用率站比达到了100%快速定位(2020.5.29)
第六讲 整体CPU使用率很高,但top无法看到具体进程(2020.5.30)
第七讲 系统中出现大量不可中断进程和僵尸进程(2020.5.30)
第九讲 软中断(2020.5.31)
第十讲 软中断问题定位(2020.6.1)
第十一讲 如何快速分析出CPU的性能瓶颈(2020.6.3)
第十二讲 CPU 性能优化的几个思路(2020.6.4)

wireshark 分析TCP哪些事儿

前提摘要

在工作需要经常使用 wireshark 来分析抓包的数据,有时候打开一个.pcap文件的时候满屏的花花绿绿,看着是不是眼花撩乱。在这里总结一下这个工具的使用技巧。

首先来上一张各种颜色代表的含义图:
image
我们经常遇到的是 bad TCP 和红色的 RST, 后面主要来看下我们这样分析各种bad TCP。

通过这篇文章我总结一下怎么样更好的分析 TCP。

通过 expert Infomantation 查看本次抓包的统计情况。

针对上面的 Bad TCP wireshark 在expert Info里给出了具体的原因:
image
expert Info个颜色表示的意思参考:

Chat (grey): information about usual workflow, e.g. a TCP packet with the SYN flag set

Note (cyan): notable things, e.g. an application returned an “usual” error code like HTTP 404

Warn (yellow): warning, e.g. application returned an “unusual” error code like a connection problem
Error (red): serious problem, e.g. [Malformed Packet]

然后我们可以通过 Analyze → Expert Info 来查看统计信息:
image

通过这里可以看到有多少个重传,建立了多少个新的连接。

对各种 bad TCP 的解释

TCP ACKed unseen segment

Set when the expected next acknowledgement number is set for the reverse direction and it’s less than the current acknowledgement number.

TCP Dup ACK <frame> #<acknowledgement number>

Set when all of the following are true:

  • The segment size is zero.
  • The window size is non-zero and hasn’t changed.
  • The next expected sequence number and last-seen acknowledgment number are non-zero (i.e. the connection has been established).
  • SYN, FIN, and RST are not set.

TCP Fast Retransmission

Set when all of the following are true:

  • This is not a keepalive packet.
  • In the forward direction, the segment size is greater than zero or the SYN or FIN is set.
  • The next expected sequence number is greater than the current sequence number.
  • We have more than two duplicate ACKs in the reverse direction.
  • The current sequence number equals the next expected acknowledgement number.
  • We saw the last acknowledgement less than 20ms ago.

Supersedes “Out-Of-Order”, “Spurious Retransmission”, and “Retransmission”.

TCP Keep-Alive

Set when the segment size is zero or one, the current sequence number is one byte less than the next expected sequence number, and any of SYN, FIN, or RST are set.

Supersedes “Fast Retransmission”, “Out-Of-Order”, “Spurious Retransmission”, and “Retransmission”.

TCP Keep-Alive ACK

Set when all of the following are true:

  • The segment size is zero.
  • The window size is non-zero and hasn’t changed.
  • The current sequence number is the same as the next expected sequence number.
  • The current acknowledgement number is the same as the last-seen acknowledgement number.
  • The most recently seen packet in the reverse direction was a keepalive.
  • The packet is not a SYN, FIN, or RST.
  • Supersedes “Dup ACK” and “ZeroWindowProbeAck”.

TCP Out-Of-Order

Set when all of the following are true:

This is not a keepalive packet.
In the forward direction, the segment length is greater than zero or the SYN or FIN is set.
The next expected sequence number is greater than the current sequence number.
The next expected sequence number and the next sequence number differ.
The last segment arrived within the calculated RTT (3ms by default).
Supersedes “Spurious Retransmission” and “Retransmission”.

TCP Port numbers reused

Set when the SYN flag is set (not SYN+ACK), we have an existing conversation using the same addresses and ports, and the sequencue number is different than the existing conversation’s initial sequence number.

TCP Previous segment not captured

Set when the current sequence number is greater than the next expected sequence number.

TCP Spurious Retransmission
Checks for a retransmission based on analysis data in the reverse direction. Set when all of the following are true:

The SYN or FIN flag is set.
This is not a keepalive packet.
The segment length is greater than zero.
Data for this flow has been acknowledged. That is, the last-seen acknowledgement number has been set.
The next sequence number is less than or equal to the last-seen acknowledgement number.
Supersedes “Retransmission”.

TCP Retransmission

Set when all of the following are true:

This is not a keepalive packet.
In the forward direction, the segment length is greater than zero or the SYN or FIN flag is set.
The next expected sequence number is greater than the current sequence number.
TCP Window Full
Set when the segment size is non-zero, we know the window size in the reverse direction, and our segment size exceeds the window size in the reverse direction.

TCP Window Update

Set when the all of the following are true:

  • The segment size is zero.
  • The window size is non-zero and not equal to the last-seen window size.
  • The sequence number is equal to the next expected sequence number.
  • The acknowledgement number is equal to the last-seen acknowledgement number.
    None of SYN, FIN, or RST are set.

TCP ZeroWindow

Set when the window size is zero and non of SYN, FIN, or RST are set.

TCP ZeroWindowProbe

Set when the sequence number is equal to the next expected sequence number, the segment size is one, and last-seen window size in the reverse direction was zero.

If the single data byte from a Zero Window Probe is dropped by the receiver (not ACKed), then a subsequent segment should not be flagged as retransmission if all of the following conditions are true for that segment: - The segment size is larger than one. - The next expected sequence number is one less than the current sequence number.

This affects “Fast Retransmission”, “Out-Of-Order”, or “Retransmission”.

TCP ZeroWindowProbeAck

Set when the all of the following are true:

  • The segment size is zero.
  • The window size is zero.
  • The sequence number is equal to the next expected sequence number.
  • The acknowledgement number is equal to the last-seen acknowledgement number.
  • The last-seen packet in the reverse direction was a zero window probe.

Supersedes “TCP Dup ACK”.

更详细的 wirekshark 教程参考 https://www.wireshark.org/docs/wsug_html_chunked/index.html

NSQ源码-NSQD

看完了nsqlookupd我们继续往下看, nsqd才是他的核心. 里面大量的使用到了go channel, 相信看完之后对你学习go有很大的帮助.相较于lookupd部分无论在代码逻辑和实现上都要复杂很多.
不过基本的代码结构基本上都是一样的, 进程使用go-srv来管理, Main里启动一个http sever和一个tcp server, 这里可以参考下之前文章的进程模型小节, 不过在nsqd中会启动另外的两个goroutine queueScanLoop和lookupLoop。下面是一个
具体的进程模型。

image

后面的分析都是基于这个进程模型。

NSQD的启动

启动时序这块儿大体上和lookupd中的一致, 我们下面来看看lookupLoop和queueScanLoop.
lookupLoop代码见nsqd/lookup.go中 主要做以下几件事情:

  • 和lookupd建立连接(这里是一个长连接)
  • 每隔15s ping一下lookupd
  • 新增或者删除topic的时候通知到lookupd
  • 新增或者删除channel的时候通知到lookupd
  • 动态的更新options

由于设计到了nsq里的in-flight/deferred message, 我们把queueScanLoop放到最后来看.

一条message的LifeLine

下面我们就通过一条message的生命周期来看下nsqd的工作原理. 根据官方的QuickStart, 我们可以通过curl来pub一条消息.

curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'

http handler

我们就跟着代码看一下, 首先是http对此的处理:

// nsq/nsqd/http.go
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
	...
	reqParams, topic, err := s.getTopicFromQuery(req) // 从http query中拿到topic信息
	...
}
// nsq/nsqd/http.go
func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
	reqParams, err := url.ParseQuery(req.URL.RawQuery)
	topicNames, ok := reqParams["topic"]
	return reqParams, s.ctx.nsqd.GetTopic(topicName), nil
}
// nsq/nsqd/nsqd.go
// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
	// 1. 首先查看n.topicMap,确认该topic是否已经存在(存在直接返回)
	t, ok := n.topicMap[topicName]
	// 2. 否则将新建一个topic
	t = NewTopic(topicName, &context{n}, deleteCallback)
	n.topicMap[topicName] = t

	// 3. 查看该nsqd是否设置了lookupd, 从lookupd获取该tpoic的channel信息
	// 这个topic/channel已经通过nsqlookupd的api添加上去的, 但是nsqd的本地
	// 还没有, 针对这种情况我们需要创建该channel对应的deffer queue和inFlight
	// queue.
	lookupdHTTPAddrs := n.lookupdHTTPAddrs()
	if len(lookupdHTTPAddrs) > 0 {
		channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
	}
	// now that all channels are added, start topic messagePump
	// 对该topic的初始化已经完成下面就是message
	t.Start()
	return t
}

topic messagePump

在上面消息初始化完成之后就启动了tpoic对应的messagePump

// nsq/nsqd/topic.go
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {

	// 1. do not pass messages before Start(), but avoid blocking Pause() 
	// or GetChannel() 
	// 等待channel相关的初始化完成,GetTopic中最后的t.Start()才正式启动该Pump
	

	// 2. main message loop
	// 开始从Memory chan或者disk读取消息
	// 如果topic对应的channel发生了变化,则更新channel信息
	
	// 3. 往该tpoic对应的每个channel写入message(如果是deffermessage
	// 的话放到对应的deffer queue中
	// 否则放到该channel对应的memoryMsgChan中)。
}

至此也就完成了从tpoic memoryMsgChan收到消息投递到channel memoryMsgChan的投递, 我们先看下http
收到消息到通知pump处理的过程。

// nsq/nsqd/http.go
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
	...
	msg := NewMessage(topic.GenerateID(), body)
	msg.deferred = deferred
	err = topic.PutMessage(msg)
	if err != nil {
		return nil, http_api.Err{503, "EXITING"}
	}

	return "OK", nil
}
// nsq/nsqd/topic.go
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
	t.RLock()
	defer t.RUnlock()
	if atomic.LoadInt32(&t.exitFlag) == 1 {
		return errors.New("exiting")
	}
	err := t.put(m)
	if err != nil {
		return err
	}
	atomic.AddUint64(&t.messageCount, 1)
	return nil
}
func (t *Topic) put(m *Message) error {
	select {
	case t.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
		t.ctx.nsqd.SetHealth(err)
		if err != nil {
			t.ctx.nsqd.logf(LOG_ERROR,
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				t.name, err)
			return err
		}
	}
	return nil
}

这里memoryMsgChan的大小我们可以通过--mem-queue-size参数来设置,上面这段代码的流程是如果memoryMsgChan还没有满的话
就把消息放到memoryMsgChan中,否则就放到backend(disk)中。topic的mesasgePump检测到有新的消息写入的时候就开始工作了,
从memoryMsgChan/backend(disk)读取消息投递到channel对应的chan中。 还有一点请注意就是messagePump中

	if len(chans) > 0 && !t.IsPaused() {
		memoryMsgChan = t.memoryMsgChan
		backendChan = t.backend.ReadChan()
	}

这段代码只有channel(此channel非golang里的channel而是nsq的channel类似nsq_to_file)存在的时候才会去投递。上面部分就是
msg从producer生产消息到吧消息写到memoryChan/Disk的过程,下面我们来看下consumer消费消息的过程。

首先是consumer从nsqlookupd查询到自己所感兴趣的topic/channel的nsqd信息, 然后就是来连接了。

tcp handler

对新的client的处理

//nsq/internal/protocol/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
	go handler.Handle(clientConn)
}
//nsq/nsqd/tcp.go
func (p *tcpServer) Handle(clientConn net.Conn) {
	prot.IOLoop(clientConn)
}

针对每个client起一个messagePump吧msg从上面channel对应的chan 写入到consumer侧

//nsq/nsqd/protocol_v2.go
func (p *protocolV2) IOLoop(conn net.Conn) error {
	client := newClientV2(clientID, conn, p.ctx)
	p.ctx.nsqd.AddClient(client.ID, client)

	messagePumpStartedChan := make(chan bool)
	go p.messagePump(client, messagePumpStartedChan)

	// read the request
	line, err = client.Reader.ReadSlice('\n')
	response, err = p.Exec(client, params)
	p.Send(client, frameTypeResponse, response)

}
//nsq/nsqd/protocol_v2.go
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	switch {
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
	case bytes.Equal(params[0], []byte("RDY")):
		return p.RDY(client, params)
	case bytes.Equal(params[0], []byte("REQ")):
		return p.REQ(client, params)
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
	case bytes.Equal(params[0], []byte("MPUB")):
		return p.MPUB(client, params)
	case bytes.Equal(params[0], []byte("DPUB")):
		return p.DPUB(client, params)
	case bytes.Equal(params[0], []byte("NOP")):
		return p.NOP(client, params)
	case bytes.Equal(params[0], []byte("TOUCH")):
		return p.TOUCH(client, params)
	case bytes.Equal(params[0], []byte("SUB")):
		return p.SUB(client, params)
	case bytes.Equal(params[0], []byte("CLS")):
		return p.CLS(client, params)
	case bytes.Equal(params[0], []byte("AUTH")):
		return p.AUTH(client, params)
	}
}
//nsq/nsqd/protocol_v2.go
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
	var channel *Channel
	topic := p.ctx.nsqd.GetTopic(topicName)
	channel = topic.GetChannel(channelName)
	channel.AddClient(client.ID, client)

	// 通知messagePump开始工作
	client.SubEventChan <- channel

通知topic的messagePump开始工作

func (t *Topic) GetChannel(channelName string) *Channel {
	t.Lock()
	channel, isNew := t.getOrCreateChannel(channelName)
	t.Unlock()

	if isNew {
		// update messagePump state
		select {
		case t.channelUpdateChan <- 1:
		case <-t.exitChan:
		}
	}

	return channel
}

message 对应的Pump

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	for {
		if subChannel == nil || !client.IsReadyForMessages() {
			// the client is not ready to receive messages...
			// 等待client ready,并且channel的初始化完成
			flushed = true
		} else if flushed {
			// last iteration we flushed...
			// do not select on the flusher ticker channel
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = nil
		} else {
			// we're buffered (if there isn't any more data we should flush)...
			// select on the flusher ticker channel, too
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = outputBufferTicker.C
		}

		select {
		case <-flusherChan:
			// if this case wins, we're either starved
			// or we won the race between other channels...
			// in either case, force flush
		case <-client.ReadyStateChan:
		case subChannel = <-subEventChan:
			// you can't SUB anymore
			// channel初始化完成,pump开始工作
			subEventChan = nil
		case identifyData := <-identifyEventChan:
			// you can't IDENTIFY anymore
		case <-heartbeatChan:
			// heartbeat的处理
		case b := <-backendMsgChan:
			// 1. decode msg
			// 2. 把msg push到Flight Queue里
			// 3. send msg to client
		case msg := <-memoryMsgChan:
			// 1. 把msg push到Flight Queue里
			// 2. send msg to client
		case <-client.ExitChan:
			// exit the routine
		}
	}

至此我们看的代码就是一条消息从pub到nsqd中到被消费者处理的过程。不过得注意一点,我们在上面的代码分析中,创建
topic/channel的部分放到了message Pub的链上, 如果是没有lookupd的模式的话这部分是在client SUB链上的。

topic/hannel的管理

在NSQ内部通过

type NSQD struct {
	topicMap map[string]*Topic
}

type Topic struct {
	channelMap        map[string]*Channel
}

来维护一个内部的topic/channel状态,然后在提供了如下的接口来管理topic和channel

/topic/create - create a new topic
/topic/delete - delete a topic
/topic/empty - empty a topic
/topic/pause - pause message flow for a topic
/topic/unpause - unpause message flow for a topic
/channel/create - create a new channel
/channel/delete - delete a channel
/channel/empty - empty a channel
/channel/pause - pause message flow for a channel
/channel/unpause - unpause message flow for a channel

create topic/channel的话我们在之前的代码看过了,这里可以重点看下topic/channel delete的时候怎样保证数据优雅的删除的,以及
messagePump的退出机制。

queueScanLoop的工作

// queueScanLoop runs in a single goroutine to process in-flight and deferred
// priority queues. It manages a pool of queueScanWorker (configurable max of
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
//
// It copies Redis's probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).
//
// If either of the queues had work to do the channel is considered "dirty".
//
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
// the loop continues without sleep.

这里的注释已经说的很明白了,queueScanLoop就是通过动态的调整queueScanWorker的数目来处理
in-flight和deffered queue的。在具体的算法上的话参考了redis的随机过期算法。

总结

阅读源码就是走走停停的过程,从一开始的无从下手到后面的一点点的把它啃透。一开始都觉得很困难,无从下手。以前也是尝试着去看一些
经典的开源代码,但都没能坚持下来,有时候人大概是会高估自己的能力的,好多东西自以为看个一两遍就能看懂,其实不然,
好多知识只有不断的去研究你才能参透其中的原理。

      一定要持续的读,不然过几天之后就忘了前面读的内容
      一定要多总结, 总结就是在不断的读的过程,从第一遍读通到你把它表述出来至少需要再读5-10次
      多思考,这段时间在地铁上/跑步的时候我会回向一下其中的流程
      分享(读懂是一个层面,写出来是一个层面,讲给别人听是另外一个层面)

后面我会先看下go-nsqd部分的代码,之后会研究下gnatsd, 两个都是cloud native的消息系统,看下有啥区别。

<<Linux性能优化实战>> IO篇 阅读笔记

灭火:美国金融危机及其教训

image

流感到来之后大家都在恐慌,发生“金融危机”的声音也是满天飞,于是想找来这方面的书读一读。

一来了解一下2008年金融危机爆发过程以及其对之后的经济造成的影响。

二来也是想更好的了解一下美国社会

最近你就会发想多所有的事情,中美都是两种态度,感觉两个 “成年人” 很难坐下来一起谈谈了。我在想美国的思维逻辑是什么样的?

redis 底层数据结构SDS(simple dynamic string)

看完sds.c/h 文件的第一感觉就是通过这些大牛的代码你能学习到很多大牛在语言上的*操作。

在看 sds(simple dynamic string) 之前我们来看下 char * 作为 C 中 string 类型有那么问题, 我们就从string.h里的几个函数出发:

  • 用strlen来求字符串的长度

    这个函数估计是c中使用最对的函数,由于strlen的复杂度为O(N),但对redis这种为高速系统而言这一点是无法容忍的。

  • strcat/strcpy

    缓冲区溢出,对对程序员来说就是噩梦,一个str操作你把别人内存给写了(尤其是多人开发中你把别人内存给写了)。ok这里缓存溢出问题你解决了,如果你服务端使用strcat, strcpy操作的话,还需面临一个问题就是如果dst大小不合适的话需要不断的allocate,大量allocate/free 必然会影响系统的效率。

  • 二进制安全

The strcat() and strncat() functions append a copy of the null-terminated string s2 to the end of the null-terminated string s1, then add a terminating `\0'. The string s1 must have sufficient space to hold the result. " copy from ,man of strcat "The strlen() function returns the number of characters that precede the terminating NUL character" --copy form man of strlen

从上面的描述来看,这个连个参数的操作为都与"\0"有关,而作为datebsse,我们不能对这里的存储进行假设。

sds就是为解决上面的问题而生, 下面看下redis中string的表示方式:

struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};

具体是的使用那种是通过分配的大小来决定的。header中的len表示了当前字符串的长度,这样求strlen的时候复杂度为O(1), alloc表示已分配的大小。下面来看下sdsnewlen的函数来学习一下sds的基本操作。

sds sdsnewlen(const void *init, size_t initlen) {	
	void *sh;
	sds s;
	
	// 根据传入的strlen来确定实际的类型(SDS_TYPE_5|8|16|32|64)
	char type = sdsReqType(initlen);
	/* Empty strings are usually created in order to append. Use type 8
	* since type 5 is not good at this. */
	if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;
	int hdrlen = sdsHdrSize(type);
	unsigned char *fp; /* flags pointer. */
	// 分配大小 hdrlen ++ origin string len ++  '\0'
	sh = s_malloc(hdrlen+initlen+1);
	if (init==SDS_NOINIT)
	init = NULL;
	else if (!init)
	memset(sh, 0, hdrlen+initlen+1);
	if (sh == NULL) return NULL;
	// 获得string的地址 sh + hdrlen的偏移量
	s = (char*)sh+hdrlen;
	fp = ((unsigned char*)s)-1;
	
	switch(type) {
	    	// 设定具体的len 和alloc(初次分配的大小为len)
        case SDS_TYPE_5: {
            *fp = type | (initlen << SDS_TYPE_BITS);
            break;
        }
        case SDS_TYPE_8: {
            SDS_HDR_VAR(8,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_16: {
            SDS_HDR_VAR(16,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_32: {
            SDS_HDR_VAR(32,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_64: {
            SDS_HDR_VAR(64,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
    }
	// copy 具体的str
	if (initlen && init)
	memcpy(s, init, initlen);
	s[initlen] = '\0';
	
	// 返回 c string(为了适配其他的c string 函数)
	return s;
	}

下面来按下里面几个宏操作,确实好多操作都是第一次见(有一种wow还可以这样操作的感觉)

#define SDS_HDR_VAR(T,s) struct sdshdr##T *sh = (void*)((s)-(sizeof(struct sdshdr##T)));

得到具体的sdshdr8|sdshdr16|sdshdr32|sdshdr64类型后来读取sdshdr->len

static inline size_t sdslen(const sds s) {
    unsigned char flags = s[-1];
    switch(flags&SDS_TYPE_MASK) {
        case SDS_TYPE_5:
            return SDS_TYPE_5_LEN(flags);
        case SDS_TYPE_8:
            return SDS_HDR(8,s)->len;
        case SDS_TYPE_16:
            return SDS_HDR(16,s)->len;
        case SDS_TYPE_32:
            return SDS_HDR(32,s)->len;
        case SDS_TYPE_64:
            return SDS_HDR(64,s)->len;
    }
    return 0;
}

但第一次见unsigned char flags = s[-1];得到 flag的操作后大开眼界, 第一次见c中index为-1的操作,后来想想这也就是指针和数组完美的结合。

c 中字节对齐相关__attribute__

__attribute__((aligned (n))),让所作用的结构成员对齐在n字节自然边界上。如果结构中有成员的长度大于n,则按照最大成员的长度来对齐。

__attribute__ ((packed)),取消结构在编译过程中的优化对齐,也可以认为是1字节对齐,紧凑性对齐,保证所有平台看到的大小一样。

emqtt中的一个 mqtt 连接的建立

在前面的文章中总结了一下 emqtt 的启动流程,下面来看一个 mqtt 连接建立的具体过程, 总结为一张图就是:
image

下面我们来逐步看下每个过程,emqtt 中每个 client 连接都有两个进程来维护, connection 进程和 process 进程。
connection 进程经由 esockd 来创建,主要负责协议的解析,流量的控制。 session 主要负责 mqtt 中 pub/sub 相关的业务。
本节中我们主要 focus 在 connection 进程上,来看下一个连接建立的具体过程。

socket 连接的建立

在系统启动的时候会启动一个 tcp 的 pool, 当有新的连接建立是就通过 eralng 的 MFA 机制来通知到指定的模块。

%% emqx_listeners.erl
start_mqtt_listener(Name, ListenOn, Options) ->
    SockOpts = esockd:parse_opt(Options),
    esockd:open(Name, ListenOn, merge_default(SockOpts),
                {emqx_connection, start_link, [Options -- SockOpts]}).

在这里我们对 1883 端口进行监听,当有新的connection到来时esockd就会调到 emqx_connection:start_link 上. emqx_connection 是对 gen_server behavior 的实现,这里每来一个连接之后也就创建了一个 erlang process.
然后即是一系列 gen_server 的 api, 下面来看下 init 里面初始化了哪些

init([Transport, RawSocket, Options]) ->
    case Transport:wait(RawSocket) of
        {ok, Socket} ->
            Zone = proplists:get_value(zone, Options),
            {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
            {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
            Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
            RateLimit = init_limiter(proplists:get_value(rate_limit, Options)),
            PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
            ActiveN = proplists:get_value(active_n, Options, ?DEFAULT_ACTIVE_N),
            EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
            IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
            SendFun = send_fun(Transport, Socket),
            ProtoState = emqx_protocol:init(#{peername => Peername,
                                              sockname => Sockname,
                                              peercert => Peercert,
                                              sendfun  => SendFun}, Options),
            ParserState = emqx_protocol:parser(ProtoState),
            GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
            GcState = emqx_gc:init(GcPolicy),
            State = run_socket(#state{transport    = Transport,
                                      socket       = Socket,
                                      peername     = Peername,
                                      conn_state   = running,
                                      active_n     = ActiveN,
                                      rate_limit   = RateLimit,
                                      pub_limit    = PubLimit,
                                      proto_state  = ProtoState,
                                      parser_state = ParserState,
                                      gc_state     = GcState,
                                      enable_stats = EnableStats,
                                      idle_timeout = IdleTimout
                                     }),
            ok = emqx_misc:init_proc_mng_policy(Zone),
            emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
            gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
                                  State, self(), IdleTimout);
        {error, Reason} ->
            {stop, Reason}
    end.
  1. 这对每个 client 做了流量控制 rate_limit/pub_limit(token buckets 算法)
  2. 设定 proto state, 然后就是 enter_loop 等待数据的到来

这里有个点需要注意就是在 run_socket 里对非 ssl 的 socket 设定 activeN

Transport:setopts(Socket, [{active, TrueOrN}]))

然后 socket 收到 N 个 packet 之后就会通过

%% Rate limit here, cool:)
handle_info({tcp_passive, _Sock}, State) ->
    {noreply, run_socket(ensure_rate_limit(State))};

来告诉这个 process 说收到多少个包了,然后在 ensure_rate_limit 里做了流量控制。

connect packet 处理

上面连接建立好之后 process 就 进到 gen_srever 的 loop 中等带消息的到来, 有新的消息的时候通过 handle_info 的方式通知到 connection process

handle_info({TcpOrSsL, _Sock, Data}, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
    process_incoming(Data, State);

这里就表示一条消息来了。 下面就是具体消息的处理了,我们这里分析一下 connect, sub, pub 着三个消息

%% Parse and handle packets
handle_packet(<<>>, State) ->
    {noreply, State};

handle_packet(Data, State = #state{proto_state  = ProtoState,
                                   parser_state = ParserState,
                                   idle_timeout = IdleTimeout}) ->
    try emqx_frame:parse(Data, ParserState) of
        {more, ParserState1} ->
            {noreply, State#state{parser_state = ParserState1}, IdleTimeout};
        {ok, Packet = ?PACKET(Type), Rest} ->
            emqx_metrics:received(Packet),
            (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
            case emqx_protocol:received(Packet, ProtoState) of
                {ok, ProtoState1} ->
                    handle_packet(Rest, reset_parser(State#state{proto_state = ProtoState1}));
                {error, Reason} ->
                    ?LOG(error, "Process packet error - ~p", [Reason]),
                    shutdown(Reason, State);
                {error, Reason, ProtoState1} ->
                    shutdown(Reason, State#state{proto_state = ProtoState1});
                {stop, Error, ProtoState1} ->
                    stop(Error, State#state{proto_state = ProtoState1})
            end;
        {error, Reason} ->
            ?LOG(error, "Parse frame error - ~p", [Reason]),
            shutdown(Reason, State)
    catch
        _:Error ->
            ?LOG(error, "Parse failed for ~p~nError data:~p", [Error, Data]),
            shutdown(parse_error, State)
    end.
  1. 更新 metrics 的信息
  2. 开始处理 Packet

注意在 emq 3.0 中增加了对 mqtt v5.0 的支持, 我们这里的分析还是以 v3.1.11 为主要版本

下面的 process_packet 主要负责对 connect 包的处理, 包括:

  1. 用户名/密码的校验
  2. session 的建立
  3. 向 connection_manager 注册自己
  4. start keepalive timer

之前的emqx_protocol:recvived 里面对 connect 包本身做了校验。

process_packet(?CONNECT_PACKET(), PState) ->

    NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState),

    emqx_logger:set_metadata_client_id(NewClientId),

    %% TODO: Mountpoint...
    %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)

    PState1 = set_username(),

    connack(
      case check_connect(ConnPkt, PState1) of
          {ok, PState2} ->
              case authenticate(credentials(PState2), Password) of  %% 权限验证
                  {ok, IsSuper} ->
                      %% Maybe assign a clientId
                      PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
                      emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
                      %% Open session
                      SessAttrs = #{will_msg => make_will_msg(ConnPkt)},
                      case try_open_session(SessAttrs, PState3) of %% session 进程的建立
                          {ok, SPid, SP} ->
                              PState4 = PState3#pstate{session = SPid, connected = true},
                              ok = emqx_cm:register_connection(client_id(PState4)),
                              true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
                              %% Start keepalive
                              start_keepalive(Keepalive, PState4), %% start keepalive timer
                              %% Success
                              {?RC_SUCCESS, SP, PState4};
                          {error, Error} ->
                              ?LOG(error, "Failed to open session: ~p", [Error]),
                              {?RC_UNSPECIFIED_ERROR, PState1}
                      end;
                  {error, Reason} ->
                      ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason]),
                      {?RC_NOT_AUTHORIZED, PState1}
              end;
          {error, ReasonCode} ->
              {ReasonCode, PState1}
      end);

至此就完成了一个 mqtt 连接的建立, 然后及时 等待 pub/sub/ping 消息的到来了。后面我们再分析一下具体的消息路有。

去规模化:小经济的大机会

image

很赞同前言中提到的后工业时代的发展方向,重复性的工作必将会被替代。机械性的东西就应该交给机器去做,人应该更多的去关注产品,怎么将产品做的更好(提供使用体验)

k8s核心概念

image
photo by Julius Sivler

摘要

k8s最吸引我的是水平扩展自动容器恢复两个功能, 想想春节晚会抢红包和微博热点新闻等突发流量的时候快速扩容是多么的重要,如果手动去扩容几百台机器那绝对是异常噩梦. 容器的自动恢复这点就更关键了 根据“小概率时间必然发生必要发生”原则, 你的系统节点在某一时间点肯定会down, 对这种异常的down机处理起来应该都是事故级别的.

k8s这一套东西里的很多概念各公司现有的系统里也是在致力于解决这个问题, google 从Borg到k8s的公开将这一套系统标准化了. 大家都用一套标准的东西就是你在一家学习的东西可以在下家立马上手, 再也不用话很多的时间去熟悉公司的现有的发布系统. 这对公司还是个人都是很有利的.

核心架构:

k8s是一个典型的二层架构, 集群由master节点和worker 节点组成. master 主要负责集群的管理和控制, 用户通过ui/cli 方式和master节点进行交互, 然后master节点来完成调度和控制, master节点监控着每个worker节点的状态和系统资源, 当一个woker节点上的负载变的很高之后, 改节点上的任务就会调度到别的节点上.

image

核心概念

pod:

pod 是 k8s 中最小单元的调度及管理单元, 一个pod 是是有一个根容器(pause)和一些密切关联的用户容器组成, k8s会为每个pod分配一个pod ip,一个pod中的多个容器共享一个pod id.

在pod中引入 pause容器的原因:

  1. pod的状态判断, 如果一个pod有多个容器, 系统无法监控他的状态, 这里系统引入一个和业务无关并且很轻量级pause容器之后, 系统就根据pause容器的的状态来判断整个pod是否正常工作的状态.
  2. 在一个pod中的多个容器都是强关联的, 他们都是一些超亲密的服务, 他们的通信一般都是超级频繁的, 他们之间的通信就可以借助pause容器完成

k8s可以对pod 进行cpu和memory 的限定.

volume(存储卷):

volume 是作用在pod上的, 一个pod上的多个容器可以通过volume实现共享目录, volume的生命周期和pod的生命周期一致. volume支持各种存储系统.

deployment:

可以理解为k8s中对pod的管理工具, 其中定义了pod的副本数量以及版本等信息, controller通过定期检查系统中deployment中定义的pod的数量来实现pod的故障自恢复, 使用deployment可以实现pod的升级,重新发布, 回滚.

service:

从运维角度可以通过deployment来管理pod, service 是从用户角度来提供服务, 通过service k8s对外提供访问多个pod实例的稳定地址, 保证服务访问不受pod ip 变化的影响.
imageservice, deployment, pod之间的关系

namespace:

通过将资源分配到不同的namespace中,形成逻辑上的隔离, 每个资源都属于一

namespace, 同一个namespace中的资源必须保持唯一性. 默认使用default namespace.

image
系统中默认的namespace

label:

由用户指定到资源上的key:value对,可以附加到各种资源上(node,pod, service, rc 等),然后通过label select来查询拥有某些label的资源对象. 一般是在pod 上打不同的label, 其他管理对象如deployment, rc, daemonset, job 则通过选择不同的label来筛选pod.

核心组件:

image
整体架构图

api-serve(master)r:

管理集群的入口, 提供统一的rest API, 其他组件通过api-server来进行交互.

controller(master):

实现集群故障检测和恢复, 负责执行各种控制器:

节点控制器: 节点的管理

副本控制器(rc):负责维护系统中pod的数量

scheduler(master):

资源的调度, 根据pod上的资源计算pod适合调度到那台node上

etcd(mster):

集群 meta信息的存储.

kubelet(node):

集群在node上的代理, 负责角度到node上的pod的创建,修改,删除监控, 然后和api server进行交互.

kube-proxy(node):

通过在主机上维护网络规则并执行连接转发来实现Kubernetes服务抽象

docker(node):

docker的运行时环境

一次整体流程:

image
一次pod调度交互流程

  1. 通过通过ui或者cli 提交pod部署给到api-server
  2. api将pod信息存储到etcd中
  3. api-server 通知scheduler 有新的pod需要创建,看调度到那台机器上合适
  4. scheduler 根据pod所需要的资源以及node上的资源情况选择一个合适的node, 然后将这些信息告诉apiserver
  5. api 将这些信息存储到etcd 并告诉指定节点上的kubelet
  6. 节点上的kubelet 通知 container 进行image的拉取并运行

参考资源:

  1. <<Kubernetes权威指南>> 4th edition
  2. http://docs.kubernetes.org.cn/
  3. 阿里云云生公开课
  4. 极客时间 深入剖析Kubernetes

搭建本地k8s环境

k8s实在太诱人了,auto scale,故障自修复,CI/CD 一套东西对整个开发效率正的是很多提升。
之前在本地尝试都是走minikube,用minikube有两个不方便:
1. 基于minikube的教程很少,两本 大作 <<Kubernetes权威指南>>和 <>都不是基于minikube的,在学习中总会遇到一些问题
2. 在一个单机阉割版本上运行总感觉缺点啥

基于以上两个理由想在本地搭一套k8s环境, 采用1 master + 多个work的模式。 主机作为master node,新建三个虚拟机作为worker node. 主机为8核心16G dell笔记本, 新建的三个节点为1核心1G。

安装步骤可以参考 https://www.cnblogs.com/mouseleo/p/11930784.html

下面记录遇到的一些坑:

  1. 节点从 registry-1.docker.io 拉不下来image,一直不断的超时
    由于GW的原因无法自由的互联,可以考虑阿里的镜像源加速:
    image

  2. calico-node pod 一直起不来,看node log输出如下:
    image
    [从log来看的话是网络没有配置好calico导致的,需要修改calico的IP_AUTODETECTION_METHOD参数:
    - name: IP_AUTODETECTION_METHOD
    value: "interface=wlp3s0"
    这里肯那个需要修改worker 节点上的 network interface和主机节点的iinterface一致:

ip link set xxx down
ip link  set xxx name wlp3s0
ip link set wlps30 up

到此整个集群就搭建起来了:

节点信息

image

k8s pod 信息

image

然后就开始k8s的探索之旅。

一周一package之io包

缘起

记得好几年前到过介绍 python 包的网站,作者也是每周更新一个 package, 介绍一个包的用法。我这里也是按照这样的**,每周会更新一个包的用户,这里不仅有基本的用法,也有源码的分析,也有该包在 NSQ 中的使用方式的分析。

记录总结为第一目的。业务逻辑代码写多了之后你就会有种自己能力在下降的感觉,这种需要静下来回归到经典中去,回归到经典的源码中去。只有在经典的官方 package 中你才能看到语言的精髓。

io package

基础 io 操作

还记得学习 C++ 的时候书中专门有一章来讲解 C++ 中 io 中每个类的继承关系,所有的语言语言中 io 包应该是最能体现语言抽象能力的 package。基于 linux 中 “一切皆文件” 的**,一切的操作你都会涉及到 “” 和 “”。

  1. 在网络中你要从 socket 总读写数据

  2. 在文件操作中你要读写数据

  3. 本地 buffer 操作你也要读写数据

  4. 编解码操作

  5. 字符串操作

    golang 中是使用 interface 来表达抽象的,在 io package 中你会看到很多的 interface 定义,其中最关键的就是下面三个:

type Reader interface {
	Read(p []byte) (n int, err error)
}
type Writer interface {
	Write(p []byte) (n int, err error)
}
type Closer interface {
	Close() error
}   

这三个 interface 又会延伸出其他 interface:

  1. ReadWriter
  2. ReadCloser
  3. WriteCloser
  4. ReadWriteCloser

还有 seek 方式读写的 **Seeker, **以及从哪里读的 **ReaderFrom 和读到哪里的 WriterTo **等等,里面有超多的 interface 可以参考 https://godoc.org/io

网络数据的读取

// 一个很简单的 http handler
func (h *countHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	h.mu.Lock()
	defer h.mu.Unlock()
	h.n++
    // 这里 w 就实现了 Writer interface
	fmt.Fprintf(w, "count is %d\n", h.n)
}

本地buffer 操作

// 读取数据到本地buffer中
w := bufio.NewWriter(os.Stdout)
fmt.Fprint(w, "Hello, ")
fmt.Fprint(w, "world!")
w.Flush() // Don't forget to flush!

编解码操作

// 这里原型为
// func NewEncoder(enc *Encoding, w io.Writer) io.WriteCloser
// 这里传入的w参数需要实现 Writer interface
encoder := base64.NewEncoder(base64.StdEncoding, os.Stdout)
encoder.Write(input) // 这里最终会调用 w.Write([]byte) 方法

字符串操作

以 strings.NewReader 为例子,它实现了  io.Reader, io.ReaderAt, io.Seeker, io.WriterTo, io.ByteScanner, io.RuneScanner 等 interface, 实现了从字符串中的快速高效的读写:

r1 := strings.NewReader("first reader\n")
buf := make([]byte, 8)
// 这里 r1 实现了 Reader interface
if _, err := io.CopyBuffer(os.Stdout, r1, buf); err != nil {
    log.Fatal(err)
}

## pipe 以及 ioutil

ioutil 这个包从名字就很好理解,其中常用的就:

func ReadAll(r io.Reader) ([]byte, error)

func ReadFile(filename string) ([]byte, error)

func WriteFile(filename string, data []byte, perm os.FileMode) error

这三个函数。 pipe就很有意思了, pipe 的两端分别是一个reader 和 writer, 你可以理解为一个生产者和消费者。不过他们底层共享一个数据结构pipe。

func Pipe() (*PipeReader, *PipeWriter) {
		p := &pipe{
			wrCh: make(chan []byte),
			rdCh: make(chan int),
			done: make(chan struct{}),
		}
		return &PipeReader{p}, &PipeWriter{p}
	}

在读写的过程中, 读写的交流主要是靠 wrCh 和 **rdCh, **你可以看到他们之间并没有通过共享一个buffer 然后各自加锁来实现共享的, 而是通过channel来实现的共享。

Do not communicate by sharing memory; instead, share memory by communicating.

感觉 pipe 的用法就是 对上面这句 “设计哲学”最好的体现。 下一小节源码共享中会更多的分析 Pipe。

源码分析

源码分析这里选择两个函数一个是io.copyBuffer 函数,一个是pipe 操作。

io.copyBuffer

// 从 src 拷贝所有读到的数据到dst
// copyBuffer is the actual implementation of Copy and CopyBuffer.
// if buf is nil, one is allocated.
func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
	// If the reader has a WriteTo method, use it to do the copy.
	// Avoids an allocation and a copy.
	if wt, ok := src.(WriterTo); ok {
		return wt.WriteTo(dst)
	}
	// Similarly, if the writer has a ReadFrom method, use it to do the copy.
	if rt, ok := dst.(ReaderFrom); ok {
		return rt.ReadFrom(src)
	}
   // 如果buf 没有指定,申请一个临时的 buf // 用于循环从src中拷贝数据到dst
	if buf == nil {
		size := 32 * 1024
		if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
			if l.N < 1 {
				size = 1
			} else {
				size = int(l.N)
			}
		}
		buf = make([]byte, size)
	}
    // 循环拷贝数据
	for {
		nr, er := src.Read(buf)
		if nr > 0 {
			nw, ew := dst.Write(buf[0:nr])
			if nw > 0 {
				written += int64(nw)
			}
			if ew != nil {
				err = ew
				break
			}
			if nr != nw {
				err = ErrShortWrite
				break
			}
		}
		if er != nil {
			if er != EOF {
				err = er
			}
			break
		}
	}
	return written, err
}

pipe 操作

func (p *pipe) Write(b []byte) (n int, err error) {
	select {
	case <-p.done:
		return 0, p.writeCloseError()
	default:
		p.wrMu.Lock() // 保证多个写的数据不会发生错乱,保证串行写
		defer p.wrMu.Unlock()
	}

	for once := true; once || len(b) > 0; once = false {
		select {
		case p.wrCh <- b: // 往channel 中写数据
			nw := <-p.rdCh // 等等读端 读走数据
			b = b[nw:]
			n += nw
		case <-p.done: // 安全关闭读写
			return n, p.writeCloseError()
		}
	}
	return n, nil
}
func (p *pipe) Read(b []byte) (n int, err error) {
	select {
	case <-p.done:
		return 0, p.readCloseError()
	default:
	}

	select {
	case bw := <-p.wrCh: // 收到数据
		nr := copy(b, bw) // 读取
		p.rdCh <- nr // 通知写侧, 我读走了多少数据
		return nr, nil
	case <-p.done: // 正常关闭处理
		return 0, p.readCloseError()
	}
}

从上面可以看出读写两边速度不一致是完全可以接受的,多个写都串行执行的。

在 nsq 中的使用

之前系统的看过几遍 nsq 中的代码,而且github 上 start 也有17.4k, 相信这里你也能看到golang 各种使用技巧。

  1. ./nsqd/protocol_v2.go 中处理从客户端收到的请求包
// 处理 客户端发送的Pub请求
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
  	messageBody := make([]byte, bodyLen)
    // 读取数据包
	_, err = io.ReadFull(client.Reader, messageBody) 
}
  1. nsqd/message.go 中将消息写入到buffer中
func (m *Message) WriteTo(w io.Writer) (int64, error) {
		n, err := w.Write(buf[:])
}
func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {
		buf.Reset()
		_, err := msg.WriteTo(buf)
	}
  1. diskqueue.go 将消息写道磁盘上
// writeOne performs a low level filesystem write for a single []byte
// while advancing write positions and rolling files, if necessary
func (d *diskQueue) writeOne(data []byte) error {
	var err error
	d.writeBuf.Reset()
	err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)

}

io 操作 实在是太多了, 这里就不一一列举了。

一周一命令之ip

ip command 可以作为一个ifconfig的最佳替代作为物理层和链路层的配置工具,ip更强大的应该是他可以制定命名空间等等(这些应该在docker环境或者虚拟化里面会经常遇到), 我可以通过ip cmd来控制ip层,链路层以及路由表相关的配置。

以是一个 ifconfig/ip两个工具包之间的对应关系

Old command (Deprecated) New command
ifconfig -a ip a
ifconfig enp6s0 down ip link set enp6s0 down
ifconfig enp6s0 up ip link set enp6s0 up
ifconfig enp6s0 192.168.2.24 ip addr add 192.168.2.24/24 dev enp6s0
ifconfig enp6s0 netmask 255.255.255.0 ip addr add 192.168.1.1/24 dev enp6s0
ifconfig enp6s0 mtu 9000 ip link set enp6s0 mtu 9000
ifconfig enp6s0:0 192.168.2.25 ip addr add 192.168.2.25/24 dev enp6s0
netstat ss
netstat -tulpn ss -tulpn
netstat -neopa ss -neopa
netstat -g ip maddr
route ip r
route add -net 192.168.2.0 netmask 255.255.255.0 dev enp6s0 ip route add 192.168.2.0/24 dev enp6s0
route add default gw 192.168.2.254 ip route add default via 192.168.2.254
arp -a ip neigh
arp -v ip -s neigh
arp -s 192.168.2.33 1:2:3:4:5:6 ip neigh add 192.168.3.33 lladdr 1:2:3:4:5:6 dev enp6s0
arp -i enp6s0 -d 192.168.2.254 ip neigh del 192.168.2.254 dev wlp7s0

ip 出来可以配置addr/link/route之外还可以配置一下的object:

OBJECT := {addrlabel | rule | neigh | ntable | tunnel | tuntap | maddress | mroute | mrule | monitor | xfrm | netns | l2tp | tcp_metrics | token | macsec }

这些工具在实际中也没有怎么使用过,所以就不在这里展开。ip route可以仔细研究下里面的内容,看看一个包是怎么路由的:

➜  ip route
default via 192.168.1.1 dev wlp3s0 proto dhcp metric 600 
blackhole 10.244.41.0/26 proto bird 
10.244.41.19 dev cali3134e2d5dc9 scope link 
10.244.41.20 dev calic832acbf629 scope link 
10.244.41.21 dev cali6ece678f012 scope link 
169.254.0.0/16 dev virbr0 scope link metric 1000 linkdown 
172.17.0.0/16 dev docker0 proto kernel scope link src 172.17.0.1 linkdown 
192.168.1.0/24 dev wlp3s0 proto kernel scope link src 192.168.1.11 metric 600 
192.168.122.0/24 dev virbr0 proto kernel scope link src 192.168.122.1 linkdown 

深入理解pod

image

为啥需要pod

首先我们来看下什么是容器, 在docker官方网站上是这样解释的:

Containers are an abstraction at the app layer that packages code and dependencies together.
Multiple containers can run on the same machine and share the OS kernel with other containers,
each running as isolated processes in user space. Containers take up less space than VMs
(container images are typically tens of MBs in size), can handle more applications and require fewer VMs and Operating systems.

从上面的定义可以看出每个container都是独立(可以理解为每个服务)的, 在实际使用中我们也是通过namepace来做隔离, cgroup来做限制, rootfs做文件系统”. 但现实生活中总是复杂的, 你总会发现我们的每个服务并不是孤伶伶的存在的, 它最会和一些其他服务发生着点关系. k8s中为了解决这个复杂的问题提供了pod这样的一个逻辑概念, 在k8s中pod是一个最小的调度单元, 至于为啥在k8s中最小的调度单元是pod而不是container, 可以想想一下

现在有两个这样的node:

 NodeA: 1.5G 内存

NodeB:  1.25 G 内存

我现在有两个超亲密的 container, containerA需要1G内存, containerB需要0.5 G内存, 如果以container为最小单元就会发生一个尴尬的事情, 需要containerA被调度到NodeB上了, 然后再调度的时候发现containerB无法调度到NodeB上. 在k8s中通过pod为最小调度单元来解决这个问题.

Pod的内部工作原理

前面讲过了pod的目的是将一些“超亲密”关系的container放到一起, 他们之间的通信肯定是超级频繁的(本地文件共享, IPC通信, localhost通信), 由于容器之间是通过linux namespace 和 cgroup隔离开的, 怎么样让他们之间高效的共享网络资源和存储是pod所要解决的核心问题.

通过pause container 实现共享网络(一个pod中的多个container在同一个naemspace里这样可以使用localhost通信)

image

image

pod yaml 定义

下面是一个很全的pod yaml定义文件,在实际使用中并不是所有的字段都需要定义。一个tab就是一个层级关系。

apiVersion: v1            //版本
kind: pod                 //类型,pod
metadata:                 //元数据
  name: String            //元数据,pod的名字
  namespace: String       //元数据,pod的命名空间
  labels:                 //元数据,标签列表
    - name: String        //元数据,标签的名字
  annotations:            //元数据,自定义注解列表
    - name: String        //元数据,自定义注解名字
spec:                     //pod中容器的详细定义
  containers:             //pod中的容器列表,可以有多个容器
  - name: String
    image: String         //容器中的镜像
    imagesPullPolicy: [Always|Never|IfNotPresent]//获取镜像的策略
    command: [String]     //容器的启动命令列表(不配置的话使用镜像内部的命令)
    args: [String]        //启动参数列表
    workingDir: String    //容器的工作目录
    volumeMounts:         //挂载到到容器内部的存储卷设置
    - name: String
      mountPath: String
      readOnly: boolean
    ports:                //容器需要暴露的端口号列表
    - name: String
      containerPort: int  //容器要暴露的端口
      hostPort: int       //容器所在主机监听的端口(容器暴露端口映射到宿主机的端口)
      protocol: String
    env:                  //容器运行前要设置的环境列表
    - name: String
      value: String
    resources:            //资源限制
      limits:
        cpu: Srting
        memory: String
      requeste:
        cpu: String
        memory: String
    livenessProbe:         //pod内容器健康检查的设置
      exec:
        command: [String]
      httpGet:             //通过httpget检查健康
        path: String
        port: number
        host: String
        scheme: Srtring
        httpHeaders:
        - name: Stirng
          value: String 
      tcpSocket:           //通过tcpSocket检查健康
        port: number
      initialDelaySeconds: 0//首次检查时间
      timeoutSeconds: 0     //检查超时时间
      periodSeconds: 0      //检查间隔时间
      successThreshold: 0
      failureThreshold: 0
      securityContext:      //安全配置
        privileged: falae
    restartPolicy: [Always|Never|OnFailure]//重启策略
    nodeSelector: object    //节点选择
    imagePullSecrets:
    - name: String
    hostNetwork: false      //是否使用主机网络模式,默认否
  volumes:                  //在该pod上定义共享存储卷
  - name: String
    meptyDir: {}
    hostPath:
      path: string
    secret:                 //类型为secret的存储卷
      secretName: String
      item:
      - key: String
        path: String
    configMap:             //类型为configMap的存储卷
      name: String
      items:
      - key: String
        path: String

pod 的配置管理

在k8s中一个容器可以理解成为一个可执行程序,将配置直接写死在代码里并不是一很好的实战策略,而且同一个image在不同的环境中不同的配置方式,所以k8s中依赖注入的方式陪container提供具体的配置文件。k8s中配置分为两种一种是非敏感信息的配置,一直是敏感信息的配置。

非敏感信息配置一般采用configMap的方式, 敏感信息一般采用secret的方式进行配置。然后在container的spec中通过环境变量或者是文件挂载的方式进行访问。针对secret需要注意的时候输入需要base64一下。

在实际使用中感觉有个缺点就是configMap中不能应用secret环境变量,在我司的应用配置管理里在应用配置里敏感信息是用一个魔法变量代替的,然后在部署的时候自动会替换为类似secret的东西。然后在应用程序里可以通过config获取。在k8s中这两个信息需要用两种方式来获取略感麻烦。

启动脚本?? init container??可以解决这个问题?从secret总获取敏感信息然后写入config中?

downward api

由于namespace level的配置都是在pod这一层,有些场景中可能需要container获取pod的一些属性,比如pod的name, ip等信息这里也是可以通过注入的方式进行获取, 这里依然是两种方式,

文件挂载方式和环境变量方式。目前支持以下字段的注入:

  • metadata.name - the pod’s name
  • metadata.namespace - the pod’s namespace
  • metadata.uid - the pod’s UID, available since v1.8.0-alpha.2
  • metadata.labels[''] - the value of the pod’s label  (for example, metadata.labels['mylabel']); available in Kubernetes 1.9+
  • metadata.annotations[''] - the value of the pod’s annotation  (for example, metadata.annotations['myannotation']); available in Kubernetes 1.9+

也可以从container中获取一些信息:

  • A Container’s CPU limit
  • A Container’s CPU request
  • A Container’s memory limit
  • A Container’s memory request
  • A Container’s ephemeral-storage limit, available since v1.8.0-beta.0
  • A Container’s ephemeral-storage request, available since v1.8.0-beta.0

说实话我没想明白在pod中获取container的这些信息的应用场景是什么。

<<Linux性能优化实战>> 网络篇 阅读笔记

相对于CPU,内存,在实际开发中碰到的最多的应该还是和网络相关的问题,经常听到的一句话是哪里哪里访问又出问题啦,如果是非 http 的协议的话处理起来跟棘手了(http有完善的生态系统)。

  • 大概第一件事情就是 dig 一下
  • 然后 ping 一下
  • 如果是tcp的话可以 telnet 上去看下端口访问是否ok
  • 再进一步的话就是抓包分析了

接下来的两周也会是按照这个展开,再次系统的去认识Linux 网络相关的知识

目录

第三十三讲 关于 Linux 网络,你必须知道这些(2020.6.23)
第三十五讲 基础篇:C10K 和 C1000K 回顾(2020.6.27)
第三十六讲 套路篇:怎么评估系统的网络性能(2020.06.27)
第三十七讲 案例篇:DNS 解析时快时慢,我该怎么办(2020.06.28)
第三十八讲 怎么使用 tcpdump 和 Wireshark 分析网络流量(2020.06.28)
第三十九讲 案例篇:怎么缓解 DDoS 攻击带来的性能下降问题(2020.06.30)
第四十讲 案例篇:网络请求延迟变大了,我该怎么办(2020.07.05)
第四十一讲 案例篇:案例篇:如何优化 NAT 性能(2020.07.05)
第四十三讲 套路篇:网络性能优化的几个思路(2020.07.05)

golang 中各种io花式读写方法

在上一篇里我们详细的看了io 包中各种io的操作方法,是不是有点蒙,简直就是个人*操作。这里整理一下从 src 拷贝到dst的N中方法。

这里假设 dst 实现了Write 方法,src 实现了 Read 方法

io.Copy系

io.Copy(dst Writer, src Reader)

从 src 中拷贝数据到 dst,直到遇到 EOF或者错误为止。 需要注意的一点是读到EOF不会返回错误。

他底层其实是调用下面的 io.copyBuffer, 不过每次从 src-->dst 搬运的buffer是系统自动分配的32×1024 bytes。

	src1 := bytes.NewBufferString("this is a test for io.Copy")
	dst1 := bytes.NewBuffer([]byte{})
	io.Copy(dst1, src1)
	fmt.Println("dst after io.Copy = ", dst1.String(

io.CopyBuffer(dst Writer, src Reader, buf []byte)

原理和上面函数一样,不同点在于中间转运 buffer 可以自己指定。

注意这里的 buf 大小不能为0

	src2 := bytes.NewBufferString("this is a test for io.CopyBuffer")
	dst2 := bytes.NewBuffer([]byte{})
	buf := make([]byte, 1024)
	io.CopyBuffer(dst2, src2, buf)
	fmt.Println("dst after copyBuffer = ", dst2.Strin

io.CopyN((dst Writer, src Reader, n int64)

从 src 中拷贝 n bytes的和数据到 dst。这里很巧妙的使用了LimitReader

	src3 := bytes.NewBufferString("this is a test for io.CopyN")
	dst3 := bytes.NewBuffer([]byte{})
	io.CopyN(dst3, src3, 5)
	fmt.Println("dst after copyN = ", dst3.Stri

上面几个函数的共同点在于他们底层都是调用:

io.copyBuffer(dst Writer, src Reader, buf []byte)

如果 src 实现了WriteTo 方法或者 dst 实现了 ReadFrom 就直接使用
src.WriteTo(dst) 或者 dst.ReadFrom(src)

Read 系

这里系列的方法前提是你src 必须实现了 Read 方法

直接调用 src 的Read 方法

	src2 := bytes.NewBufferString("this is a test for directly Read")
	dst2 := make([]byte, 10)
	src2.Read(dst2)
	fmt.Println("dst after copyBuffer = ", string(dst2))

io.ReadFull(r Reader, buf []byte

从 r 中读取 len(buf) 大小的数据到 buf,这里需要注意一下buf 和正式读到的数据大小之间的关系

读到的数据小于len(buf) 返回读到的数据 + 错误

err == nil 当且仅当 len(buf) == read size

数据没有读完但是遇到一个EOF 返回 ErrUnexpectedEOF

	src3 := bytes.NewBufferString("this is a test")
	dst3 := make([]byte, 15)
	n, err = io.ReadFull(src3, dst3)
	fmt.Println("dst after readFull = ", string(dst3), "n = ", n, "err = ", err)

io.ReadAtLeast(r Reader, buf []byte, min int)

从 r 中至少读取min bytes的数据到buf,错与信息基本和上面一样

如果min > len(buf) 返回 short buffer‘

err == nil 当且仅当 read size >= min

SectionReader/LimitReader etc

和普通的读没啥区别,不过这里通过选择和limit来保护的内存(防止恶意的请求把内存打爆)

ioutil系

ReadAll(r io.Reader) ([]byte, error)

这里系就这个一个方法,主要用来从http或者文件中读取数据(注意文件不要太大)。从r 中读取直到EOF,所以EOF并不会当成错误来返回。

三个派系的选择

三类函数都是从src中读取数据到dst, 他们的不同点在于对EOF的处理, 在io.Copy系和ioutil系中EOF只是读取结束的值, 读到EOF并不返回EOF。io.Read 读到EOF会返回EOF错误。

一周一命令之pidstat

pidstat - Report statistics for Linux tasks.

从名字是上就已经很明确了pid统计,pidstat可以统计task各个层面的东西。

  • -d 可以统计io层面数据(单位时间内读写大小已经iodelay(block在io上的clock tick))
    image
  • -r 统计page fault/ memory 相关的信息
    image
  • -u 统计cpu相关的信息
    image
  • -v 统计task包含的线程数和打开的fd(排查fd泄漏比较有用)
    image
  • -w cpu 单位时间内上下文切换统计(排查大量进程导致的负载升高)
    image

常用的也就上面几个选项,然后就是一些共有的选项

  • U 指定用户, -p 制定pid, -t 显示tid/tgid信息, -T {TASK|CHILD|ALL} 进行不同维度的进行统计

一周一命令之top

此乃linux性能分析里的 "扛把子", 登录到一太机器上第一件事情就是top看下系统的整体运行情况。整个输出分为三个区域
image
最上面部分为系统 Summary Area 这里包含了系统整体的情况,中间为Fields Header, 下面为 Task Area,
下面让我来对每一个区域里的每一行细细给你道来。
Summary Area
summary就是系统整体运行状况的一个描述

Summary 第一行

top - 21:49:50 up 39 days, 1:07, 1 user, load average: 4.57, 4.26, 3.22

第一行里包含了系统当前的 时间, 开机运行的天数,已经系统的平均负载。其中这里的平均负载最为关键,也是最容易理解错误的一个指标, 这里的平均负载和cpu负载没有直接的关系,在和这里系统的平均负载定义为单位时间内可运行状态不可中断状态的进程数(也就是单位时间内R+D状态的进程数)。
一般系统负载大于CPU个数的70% 的时候就表示系统负载出问题了需要引起关注。 4.57, 4.26, 3.22 分别表示1分钟, 5分钟,15分钟的系统负载情况,他们的关系如下

  • 1 分钟 == 5 分钟 == 15 分钟 说明系统负载很稳定
  • 1分钟 > 5 分钟 > 15 分钟 说明系统负载在升高
  • 1分钟 < 5 分钟 < 15 分钟 说明系统负载在降低

从上面的定义中我们已经知道了系统平均负载的含义,然后我们可以反向去分析一下什么样的情况下会引起系统负载升高。

  • 有大量的 CPU密集型操作(Running 状态的进程增多)
  • 有大量的 IO密集型操作(不可中断状态的进程在增多)
  • 有大量的进程创建系统无法及时调度(等待CPU的进程在增多)

一般定位问题方式都是先用mpstat 定位到具体是以上那种case引起的负载升高, 然后是用 pidstat 分析具体是那个进程引起的.
image

image

Summary 第二行

Tasks: 436 total, 2 running, 350 sleeping, 1 stopped, 2 zombie

这一样描述了系统进程的状态统计信息,-H 可以切换到thread mode

Threads: 1915 total, 2 running, 1835 sleeping, 1 stopped, 0 zombie

Summary 第三行

%Cpu(s): 9.2 us, 26.4 sy, 0.0 ni, 46.5 id, 13.1 wa, 0.0 hi, 4.9 si, 0.0 st

两次刷新之间CPU的运行情况(可以和第一行结合着看) -1 可以查看每个核上的状态
image

Summary 第四五行

KiB Mem : 16292024 total, 1031100 free, 7575040 used, 7685884 buff/cache
KiB Swap: 0 total, 0 free, 0 used. 6644032 avail Mem

这里就是系统内存和swap的使用情况了,-E 可以在内存的各种单位之间切换

MiB Mem : 15910.18+total, 745.973 free, 7732.816 used, 7431.391 buff/cache
MiB Swap: 0.000 total, 0.000 free, 0.000 used. 6363.746 avail Mem

GiB Mem : 15.537 total, 0.699 free, 7.571 used, 7.267 buff/cache
GiB Swap: 0.000 total, 0.000 free, 0.000 used. 6.187 avail Mem

-m 可以查看使用的占比情况
image

至此整个系统的summary area 就整理完了。

Fields Header

image
这里其实定义了下面展示task的那些属性,默认是上面这些也可以 -F进行定制
image
可以考虑将page fault+nTH加到观察项里。
image

接下来即使taks区域了

Task Area

image
这里就是每个进程的状态,一些常用技巧

  • -i 隐藏 idle 的task
    image

  • -c 显示命令行的启动参数
    image

  • -P/M/N/T 分别按照cpu/物理内存/pid/运行时间进行排序

  • -F 也可以指定排序列
    image

  • -R 上面排序从大到小<---------->从小到达的转换

  • -x 高亮当前的排序列
    image

golang io包阅读笔记

最近主语言又切回了golang上来了, 好久没写了对基本都有点生疏了. 然后就着了google官方出的<>快速的扫了一遍. 看到最后的时候作者讨论了what's NEXT.

                      "For example we might take a look at the source code to 
                       the io/ioutil library available at:
                       http://golang.org/src/pkg/io/ioutil/ioutil.go 
                       Read the code slowly and deliberately. Try to under- stand 
                       every line and read the supplied comments."

然后就像顺着作者的思路来读一下源码看看大神们是怎么写代码的.
首先就从 https://github.com/golang/go/blob/master/src/io/io.go 出发, 这个包确实很适合刚开始的时候读. 看他import的东西你就知道了

import (
	"errors"
)

这里只import了一个包, 你完全不用文件切来切去了. 这种短小精悍的文件最适合复习一下语言点了.
每个语言的io包都是将语言用到极致的地方, 学习c++的时候就在不断的看io的继承结构, 学习java的时候也在看这些东西. 你把io包吃透了的话语言层面的基本上就通了, 剩下的就是你要话费更多的时间去实践了.
其中最有意思的莫过于:

func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) 

这个函数了.

if wt, ok := src.(WriterTo); ok {
		return wt.WriteTo(dst)
	}
	// Similarly, if the writer has a ReadFrom method, use it to do the copy.
	if rt, ok := dst.(ReaderFrom); ok {
		return rt.ReadFrom(src)
	}
	if buf == nil {
		size := 32 * 1024
		if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
			if l.N < 1 {

这里又两个很妙的地方一个就是

src.(WriterTo)  dst.(ReaderFrom) 

这里首先判断了src活着dst是否为空, 如果不为空的话再去确认他是不是实现了WriteTo/ReadFrom的方法,简直就是一举两得.
还有一个地方是

src.(*LimitedReader)

之类sr测试一个Reader的interface的类型, 但是通过src.(*LimitedReader)就可以判断它是不是具体的LimitReader的类型, 是不是比reflect简单很多.

一周一package之context包

context 包是用来干啥的

这包是用来干啥的可能直接看官方的文档最合适:

Package context defines the Context type, which carries deadlines, cancellation signals,
and other request-scoped values across API boundaries and between processes.

从这里看出 context包主要用来在跨api,goroutine传递消息(deadline, cancellation signals, and other scoped values)。在golang 一般业务逻辑和底层是分开的,他们属于两个goroutine,context可以个更优雅的在他们之间传播消息。

可以简单看下Context 包含的内容。

type Context interface {
	// 获取一个有deadline contxt 的过期时间,如果该context 没有deadline
    // ok 为false
	Deadline() (deadline time.Time, ok bool)
	
    // 一个只读 channel,等待 context结束(被cancel,达到deadline)
	Done() <-chan struct{}
    // 如果Done 没有比close,返回nil,否则返回被close的原因
    // canceled for context was canceled
    // DeadlineExceeded for context deadline passed
	Err() error
  
    // 携带的数据
	Value(key interface{}) interface{}
}

WithCancel

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

返回父 context的一个拷贝,以下两种情况下Done channel 会被关闭

  1. 返回的cancel方法被主动调用
  2. 父context的Done channel被关闭

由于Context 是跨goroutine 之间传递的,这里只要父进程或者父进程的父进程被cancel了,新的context 就会收到 Done的消息。下面来看一下官方的一个例子:

package main

import (
	"context"
	"fmt"
)
func main() {
	// gen generates integers in a separate goroutine and
	// sends them to the returned channel.
	// The callers of gen need to cancel the context once
	// they are done consuming generated integers not to leak
	// the internal goroutine started by gen.
	gen := func(ctx context.Context) <-chan int {
		dst := make(chan int)
		n := 1
		go func() {
			for {
				select {
				case <-ctx.Done(): // 等待 context 被cancel
					return // returning not to leak the goroutine
				case dst <- n:
					n++
				}
			}
		}()
		return dst
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // cancel when we are finished consuming integers

	for n := range gen(ctx) {
		fmt.Println(n)
		if n == 5 {
                      // 跳出for 循环结束,之后会执行上面的defer cancel,
                       // 然后在gen 就会收到context Done channel被关闭的通知
			break 		
                }
	}
}

WithDeadline

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

返回父 Context 的一个拷贝,在以下几种情况下Done channel会被关闭

  1. deadline 过期
  2. cancel 函数被调用
  3. 父 Context 的Done channel 被关闭

这里需要注意的是如果 父 Context 本身也是一个 WithDeadline的情况

  1. 如果 父 Context 的 deadline 比d要早直接返回父 Context
  2. 如果 d已经过期,设置子context的error 为 DeadlineExceeded

WithTimeout

WithDeadline的变体, 等于 WithDeadline(parent, time.Now().Add(timeout))

WithValue

在context 中附加(key,value) pair

常见使用方法

WithCancel

在处理http请求的时候,如果这个请求在业务逻辑处理完成之前,底层socket被关闭的时候取消上面的业务逻辑(以此再去做业务逻辑已经没啥意义了)。

// 请求处理函数
func handler(r *http.Request, w *http.ResponseWriter) {
	ctx, cancel := newCtx2(context.TODO(), w)
	defer cancel()
	doSomeThing(ctx, r) // 业务逻辑处理函数
}
// 构建一个新的 context
func newCtx2(ctx context.Context, w *http.ResponseWriter) (context.Context, context.CancelFunc) {
	var cancel context.CancelFunc
	if cn, ok := w.(http.CloseNotifier); ok {
        // 底层socket 被关闭
		ctx, cancel = context.WithCancel(ctx)
		notifyChan := cn.CloseNotify()
		go func() {
			select {
			case <-notifyChan:
				cancel() // cancel context
			case <-ctx.Done():
			}
		}()
	} else {
		cancel = nilCancel
	}
	return ctx, cancel
}
func nilCancel() {}

WithTimeout

处理 http client 请求超时处理, 这里请求 http://www.google.com,如果超时返回 ctx.Err() 返回 deadline exceed,否则打印success 信息

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*2) 
	defer cancel()
	done := make(chan struct{})
	go doRequest(ctx, done) // 业务请求
	select {
	case <-ctx.Done():// 超时
		d, _ := ctx.Deadline()
		fmt.Println("err = ", ctx.Err(), d)
	case <-done: // 正常处理结束
		fmt.Println("successful handle the request")
	}
}
func doRequest(ctx context.Context, done chan struct{}) {
	req, _ := http.NewRequest("GET", "http://www.google.com", nil)
	req.WithContext(ctx)
	http.DefaultClient.Do(req)
	close(done)
}

这里只罗列了这两种简单的用法,跨goroutine, 快api的只要有cancel, 处理timeout逻辑的都可以使用这种逻辑去处理。

内部实现原理

context 包的实现主要依赖 timerCtx 和 cancelCtx 这两个结构体, 这里也主要分析他俩的代码。

Cancel Context

下面是内部 concelCtx的定义

type cancelCtx struct {
	Context // 父 Context
	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
    // 存放该 context 派生出来的子context
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

下面就以构建一个 WithCancel 的context 来看下具体的实现代码

// 构建一个cancel ctx 
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil { // 确保父 contxt 不为空
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)  // 产生一个上面定义的cancelCtx
	propagateCancel(parent, &c) // 将上面派生出来的 contxt 放到父 context 对应的 children 字段中
	return &c, func() { c.cancel(true, Canceled) }
}

// 给派生出来的 context 安排一个合适的位置, 以便父 context 被取消的时候能够通知到children
func propagateCancel(parent Context, child canceler) {
	done := parent.Done()
	if done == nil {
		return // parent is never canceled
	}
	select {
	case <-done:
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}

    // 查找 parent 对应的 cancelCtx,如果父 context 是一个可 cancel 的context,将 child 放到 parent.children 中(父context cancel的时候需要通知到子 context)
	if p, ok := parentCancelCtx(parent); ok { // 
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{} // 将 child 添加到
		}
		p.mu.Unlock()
	} else { // 父 context 不是一个cancel context
		atomic.AddInt32(&goroutines, +1)
        // 在这里可以看到 如果父 context 不是一个可 cancel 的context的时候,是单独启一个goroutine 等等 context 结束的。这也是为啥一定要调用 defer cancel()的目的,不然就会发生goroutine 泄漏。
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
    }
}

下面来看下一个concel 的处理过程

// CancelFunc 的原始定义
type CancelFunc func()
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil { // 第一次调用 cancel 之后 c.err 就会设置为 err,
这里如果 c.err已经被设置了就说明该context 已经被cancel了
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done) // 通知单独起的 goroutine, 该context 被cancel
	}
    // 通知所有的派生 context,这里递归的调用cancel 每个一派生的context
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

    // 将自己从 父 context 中移除
	if removeFromParent {
		removeChild(c.Context, c)
	}
}

至此 cancelContext 的代码也就分析完了,整体过程是这样的
创建:

  1. 第一次需要派生一个cancel context的时候会起一个goouritne去等到 Done 的channel
  2. 如果已经是一个 cancel context 了则将将 派生出的 context 放到 parent的children中

cancel:

a. 如果已经被cancel 直接返回

b. close(c.done) 通知上面创建中a 起的goroutine

c. cancel 所有的children, 以及 children cancel 自己的children

Deadline Context

把上面的 cancel context 坑透之后再去看 deadline context 就简单多了, 下面一个deadline contxt 底层 timerCtx 的定义:

type timerCtx struct {
	cancelCtx // timer context的好多方法都会委托给cancelCtx
	timer *time.Timer // 对应 timer
	deadline time.Time // deadline 时间点
}

下面来看下一个timerContext 的构造过程:

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
    // parent 是一个deadlinie context 并且,parent.deadline > d
    // 直接返回 父 context(因为父 context 被取消的时候,对应的所有子孙都会被取消,所以 child.deadline > parent.deadline 就没意义了)
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
    // 构造 timerCtx
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
    // 和cancel context 一样,给新派生出来的context 安排一个合适的位置,以便父 context 被cancel 的时候通知到个子 context 
	propagateCancel(parent, c)
	dur := time.Until(d)
	if dur <= 0 { // 已经过期
        // 注意一下由于上面已经c 放到 parent.children 或者,单独其一个goroutine了, 这里需要回退掉
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil { // 没有被cancel 或者deadline excedd
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded) // dur 之后自动cancel
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

下面也看下 deadline context 被取消的过程

func (c *timerCtx) cancel(removeFromParent bool, err error) {
	c.cancelCtx.cancel(false, err) // 委托给 cancelCtx
	if removeFromParent {
		// Remove this timerCtx from its parent cancelCtx's children.
		removeChild(c.cancelCtx.Context, c)
	}
    // 取消timer
	c.mu.Lock()
	if c.timer != nil {
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

避坑指南

给你一把瑞士军刀,如果用不好可能很容易伤到自己,context 也是,在业务中有些东西也要避免滥用。

  1. 如果不是需要跨多个api之间传递的参数就不要带到context 中, 能直接使用参数形式传递的就尽量使用参数形式传递。
  2. api里面不要保存传进来的context,只应该让他在api之间流转
  3. 不要传递一个nil 给context
  4. 记得 **defer cancel() **去释放资源
  5. context 按照约定放到函数的第一个参数
func DoSomething(ctx context.Context, arg Arg) error {
 		// ... use ctx ...
}
  1. 一定要理解context中 chain的概念,一个context可以不断的派生出contxt, 只要一个conext 被cancel,他派生出来的子,子的子 等等都会被关闭。

参考:

  1. https://golang.org/pkg/context/
  2. https://blog.golang.org/context
  3. https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/

redis 源码学习计划

What is redis

什么是 redis,redis 能干什么?从官网的一个简介来了解他。
image

从上面官网的介绍可以大概了解一下 redis 能提供什么样的能力。2019准备好好
研究一下redis,将上面的红色条目一个个展开,深入的去理解里面的设计(写一系列的文章
来总结上面的每一个点)。看了下 redis 的源码总用也就87455行,是一个不大也不小的项目
如果从其中扩展开来的的话肯定能挖到很多的东西。主要还是依赖 <<Redis 设计与实现>>,跟着作者的思路一步步的往下走。

一周一package之sync

sync 包应该是我们最常用的一个包,里面包含了 WaitGroup, CondMutex

Once, Pool , RWMutex, SyncMap 等。说实话, 我是没弄明白为啥Pool 在sync包里,接下里就一个一个看下。

WaitGroup

WaitGroup 用来在main goroutine 中等待其创建的goroutine运行结束,其实类似 C 中的 wait, waitpid 两个系统调用。下面是一个简单的使用例子:

package main

import (
	"fmt"
	"sync"
)

func worker(wg *sync.WaitGroup, v int) {
	fmt.Println("v = ", v)
	wg.Done()
}
func main() {
	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1) // 增加Wait的数量
		go worker(&wg, i) // 子 routine中去处理
	}
	wg.Wait() // 等待所有的子 routine结束
}

输出:

➜  sync go run waitgroup.go        
v =  4
v =  0
v =  1
v =  2
v =  3

主要也就三个API: Add, Done, Wait

这里有一点要特别注意:

A WaitGroup must not be copied after first use.

也就是说waitgroup在函数间传递的时候不能 by value 必须要by pointer(WaitGroup 不可拷贝),worker的第一个参数必须是 *WaitGroup, 不然就会GG;

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc00009a018)
	/usr/local/go/src/runtime/sema.go:56 +0x39
sync.(*WaitGroup).Wait(0xc00009a010)
	/usr/local/go/src/sync/waitgroup.go:130 +0x65
main.main()
	/home/qx/proj/golang_weekly/sync/waitgroup.go:19 +0xbb
exit status 2

ok, 上面就是 WaitGroup 的常用模式, sync的包里的代码相对应用层的代码来说相对难理解一点,慢慢啃一下。看下上面一个panic是怎么报出来的。

首先来看下WaitGroup 内部数据结构的定义:

type WaitGroup struct {
	noCopy noCopy // 为了go vet 用来 check copylocks
	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
	// 64-bit atomic operations require 64-bit alignment, but 32-bit
	// compilers do not ensure it. So we allocate 12 bytes and then use
	// the aligned 8 bytes in them as state, and the other 4 as storage
	// for the sema.
	state1 [3]uint32
}

从上面的定义可以看出, 主要通过sate1来维护WaitGroup的内部状态, state的结构如下:

|--32 bit(counter)--|--32 bit(waiter counter)--|---32 bit(信号量)--|

信号量主要用来在两个线程直接同步。

代码逻辑大概是这样:

  1. 每次 Add 的时候就去counter = counter +1
  2. 每次 Done 的时候就去counter = conter -1, 如果couter == 0 则释放信号量(通知waiter结束)
  3. 每次 Wait 的时候waiter_counter = waiter counter +1, 然后for loop去等待信号量的通知

当counter == 0 的时候就通过信号量通知等待的进程结束

代码逻辑大概就是这样, 不过这里有一个happen before 需要注意一下:

  1. Add must happened before Wait
  2. when WaitGroup reused, new Add must happened after all previous Wait have returned

Once

很简单的逻辑, 保证函数只执行一次, golang 最大的特点就是方便开发者,官方package里包含了各种常用的函数:

// Once is an object that will perform exactly one action.
type Once struct {
	m    Mutex
	done uint32
}
func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 1 {
		return
	}
	// Slow-path.
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

需要注意的一点是 这里只保证 f()被执行到了, 中间panic啥的它是不负责的。

Pool

看到 pool 第一眼想到的就是各种连接池,在这里也不例外,这里的目的就是将一些临时的内存分配池化,较少系统的内存分配 + GC。这里有几点需要注意:

  1. pool 中未被使用的对象会在 GC 的时候收回
  2. pool 是线程安全的,可以被多个go routine 多次重复调用
  3. pool 的主要目的通过对象的复用减少内存的分配,降低 GC 的负担
  4. 由于每次都是调用 New 分配一个空间,所以 pool 中所有的对象大小都是一样的,大小不确定的对象就不太适合放到系统的 Pool 里
  5. 没有银弹,需要根据实际场景分析,是否需要 系统的pool,还是需要自己构建应用层的pool

最常用的场景的就是编解码的地方。

下面来看下代码, 这由于设计到go routine的调度相关的知识,相对之前的几个包难度要大很多。

以下是Pool的一个简单数据结构,每个P多有一个对应的private,这个private只有当但前的P可以访问。

然后每个P也有一个对应的shared list,当P上的private已经被某个goroutine 使用的时候就从P对应的shared list中取出一个object,否则就从别的P的shared list上取一个。如果再取不到就在堆上New一个。

image

这点感觉和CPU的设计很像每个Core 都有自己的cache(L1,L2),如果自己的cache放不下的时候就放在L3 中。

下面来看下Get:

func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
	l := p.pin() // 获取当前P对于的PoolLocal
	x := l.private // 优先使用private
	l.private = nil
	runtime_procUnpin()
	if x == nil { // private 已经被别的 goroutine 使用了
		l.Lock() // 别的P也可以从当前P的shared上获取对象,所以这里需要加锁
		last := len(l.shared) - 1 // 从当前P所属的poolLocal->shared上分配
		if last >= 0 { // poolLocal->shared 还有空闲的
			x = l.shared[last]
			l.shared = l.shared[:last]
		}
		l.Unlock()
		if x == nil { // poolLocal->shared 上没有空闲
			x = p.getSlow() // 从别的P对应的shared上获取
		}
	}
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	if x == nil && p.New != nil { // 从别的P对应的shared上也无法获取
		x = p.New() // 从堆上分配
	}
	return x
}

下面来看下怎么从别的P上偷一个object

func (p *Pool) getSlow() (x interface{}) {
	// See the comment in pin regarding ordering of the loads.
    // poolLocal的数量(其实就是P的数量)
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	local := p.local// 数组起始地址            // load-consume
	// Try to steal one element from other procs.
	pid := runtime_procPin()
	runtime_procUnpin()
	for i := 0; i < int(size); i++ { // 从别的P的poolLocal->shared上获取对象
		l := indexLocal(local, (pid+i+1)%int(size)) // 跳过当前的P
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
			l.Unlock()
			break
		}
		l.Unlock()
	}
	return x
}

至次一个正常的流程就已经走完了,下面来看下pin部分的代码:

func (p *Pool) pin() *poolLocal {
	pid := runtime_procPin()
	s := atomic.LoadUintptr(&p.localSize) // load-acquire
	l := p.local // 复制一份保证即使cleanUp也不影响 // load-consume
    if uintptr(pid) < s { // 可以直接使用p对应的Local
		return indexLocal(l, pid)
	}
	return p.pinSlow() // 没有足够的空间(第一次pin或者发生在两次GC之间)
}

func (p *Pool) pinSlow() *poolLocal {
	// Retry under the mutex.
	// Can not lock the mutex while pinned.
	runtime_procUnpin()
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid)
	}
    if p.local == nil { // 放到全局的Pool中(gc的时候需要回收)
		allPools = append(allPools, p)
	}
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	// 两次GC 之间P个数发生了变更需要重新分配
    size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
	return &local[pid]
}

最后就是CleanUp了, 每次gc之前会调用CleanUp, 里面的逻辑也是无脑的将所有指针置为 nil.然后GC的时候会被回收。

Put的逻辑相对要简单很多,首先找到P对应的localPool, 如果private为空就放到private中,否则就放到poolLocal-->shared 中。

Mutex

RWMutex

SyncMap

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.