Git Product home page Git Product logo

blog's People

Contributors

samuelyao314 avatar

Watchers

 avatar  avatar

Forkers

carlcjb

blog's Issues

Go, 互联网时代的C语言

Go语言, 号称互联网时代的C语言。 它从一开始,就适合服务端的开发。
下面具体叙述Go语言为什么适合服务端的开发。

GO 产生的背景

天时

从 2005 年开始,时钟速率的增长和晶体管数量的增长已不再同步。由于处理器材料的物理性质限制,时钟速率已停止增长(甚至下降),处理器制造商开始将更多执行单元(核心)封装到单个芯片(插槽)中。这一趋势(似乎能够在可预见的未来继续保持)已开始给应用程序开发和编程语言开发社区带来越来越大的压力. 高效地利用可用 CPU 核心的惟一方法就是使用并行性。

在应用程序开发方面,基于线程的并发编程是实现并行性的主导机制。
但线程编程存在本质上的困难:

  • 共享对象的竞争问题。
  • 在使用锁的时候,如果不谨慎,容易造成死锁。
  • C标准库的API一般不是线程安全的。
  • 多线程程序很难调试。

所有,基于线程的编程模型不是多核时代的最佳选择.
我们需要更合适的并发模型。

地利

语言层面支持并发模型的, 暂时只有GO和Erlang.

许式伟认为

Erlang的困难之处在于它是FP语言. 我们缺乏深入人心的FP编程理论。我们并不了解FP“数据结构”学。这是Erlang语言无法逾越的门槛,决定了它只能是小众化语言。

在Baidu上搜索“招聘+erlang”, 相关结果约20,300个。
搜索”招聘+go“, 相关结果约1,240,000个。
我个人认为,许式伟的看法是对的。

人和

Go语言之父Rob Pike 这样说

在那一个小时的演讲中,我们大概听到了约35种计划中的新特性。当然实际上还有更多,
这时候,我问了自己一个问题:C++委员会真的认为C++的特性还不够多?当然,不同于Ron的玩笑,简化这门语言必是一门更大的成就!

Go语言社区吸引了大量程序员的参与, 截至2015-10-29日

社区活跃

https://golang.org/AUTHORS 统计,核心提交者有556人。

有人用BigQuery查询github的结果:

【图2】统计从2014年05月1日起到2015年8月8号,各种语言创建的repos数排名。
限定条件:fork数>3;repos大于20MB

从搜索引擎结果看,国内已经有很多公司在使用Go。 包括百度,阿里,豆瓣,陌陌等

语言的特点

Rob Pike 说

在那一个小时的演讲中,我们大概听到了约35种计划中的新特性。当然实际上还有更多。这时候,我问了自己一个问题:C++委员会真的认为C++的特性还不够多?当然,不同于Ron的玩笑,简化这门语言必是一门更大的成就!也许这很可笑,但是请把这个想法记在心里。

类C的语法

slice 和 array 的基本操作

arr := [5]int{1, 2, 3, 4, 5}
slice := arr[3 : 5] //  slice:[4, 5]
slice[0] = 0        // slice:[0, 5]
fmt.Println(slice)
fmt.Println(arr)

输出结果是

[0 5]
[1 2 3 0 5]

对Go的Slice进行Append的一个坑

func main() {
    arr1 := [5]int{1, 2, 3, 4, 5}
    slice1 := arr1[1:2]
    slice1 = append(slice1, 6, 7, 8)
    fmt.Println("slice1:", slice1)
    fmt.Println("arr1:", arr1)
    arr2 := [5]int{1, 2, 3, 4, 5}
    slice2 := arr2[1:3]
    slice2 = append(slice2, 6, 7, 8)
    fmt.Println("slice2:", slice2)
    fmt.Println("arr2:", arr2)
}

输出结果是

slice1: [2 6 7 8]
arr1: [1 2 6 7 8] //神奇地,原数组被改变了
slice2: [2 3 6 7 8]
arr2: [1 2 3 4 5] //一切正常

interface

type geometry interface {
    area() float64
    perim() float64
}

type rect struct {
    width, height float64
}

type circle struct {
    radius float64
}

func (r rect) area() float64 {
    return r.width * r.height
}
func (r rect) perim() float64 {
    return 2*r.width + 2*r.height
}

func (c circle) area() float64 {
    return math.Pi * c.radius * c.radius
}
func (c circle) perim() float64 {
    return 2 * math.Pi * c.radius
}

func measure(g geometry) {
    fmt.Println(g)
    fmt.Println(g.area())
    fmt.Println(g.perim())
}


func main() {
    r := rect{width: 3, height: 4}
    c := circle{radius: 5}

    measure(r)
    measure(c)
}

Go语言采用的是“非侵入式接口".

Goroutines

一个Go例程就是一个和其它Go例程在同一地址空间里但却独立运行的函数。

f("hello", "world") // f runs; we wait
go f("hello", "world") // f starts running
g() // does not wait for f to return

就像是在shell里使用 & 标记启动一个命令。

Go例程不是线程, 很像线程,但比线程更轻量。
一个程序里产生成千上万个Go例程很正常。

多个例程可以在系统线程上做多路通信。

当一个Go例程阻塞时,所在的线程会阻塞,但其它Go例程不受影响。

Channels

Do not communicate by sharing memory; instead, share memory by communicating

通道是类型化的值,能够被Go例程用来做同步或交互信息。

timerChan := make(chan time.Time)
go func() {
    time.Sleep(deltaT)
    timerChan <- time.Now() // send time on timerChan
}()
// Do something else; when ready, receive.
// Receive will block until timerChan delivers.
// Value sent is other goroutine's completion time.
completedAt := <-timerChan

Select

这select语句很像switch,但它的判断条件是基于通信,而不是基于值的等量匹配。

select {
case v := <-ch1:
    fmt.Println("channel 1 sends", v)
case v := <-ch2:
    fmt.Println("channel 2 sends", v)
default: // optional
    fmt.Println("neither channel was ready")
}

以软件工程为目的的语言设计

  • 静态类型语言,直接编译成机器码。不依赖glibc版本,基本没有链接动态库的问题
  • 内置强大的工具。Go语言里面内置了很多工具。社区也实现了很多有用的插件。
    • gofmt, 自动化格式化代码。
    • gocode, 代码自动补全
    • go-def, 代码查找时的自动跳转,如查看函数或结构体的定义。
    • gotags, 显示当前文件包含哪些函数定义,结构体定义
  • 内置runtime,支持垃圾回收,内置性能调优工具。
  • 丰富的标准库,Go目前已经内置了大量的库,特别是网络库非常强大
  • 语言层面支持并发

参考文献

skynet 学习笔记 - 沙盒服务

游戏服务器开发中的其中一个难点:隔离性。在C/C++写的服务器中,一行代码中的空指针访问,就会导致整个服务器进程crash。

解决方式是:沙盒机制。

Skynet 的沙盒是利用Lua 实现的, 称为服务 snlua

下面重点讲这个沙盒是如何实现的

  • Skynet 启动
  • 沙盒启动API
  • snlua 启动

Skynet 启动

Skynet 启动过程, 主要是启动了一些沙盒服务。

Skynet 配置文件一般是 Config 文件。

按照默认配置,启动时,部分日志如下:

    $ ./skynet examples/config
    [:01000001] LAUNCH logger 
    [:01000002] LAUNCH snlua bootstrap
    [:01000003] LAUNCH snlua launcher
    [:01000004] LAUNCH snlua cmaster
    [:01000004] master listen socket 0.0.0.0:2013
    [:01000005] LAUNCH snlua cslave
    [:01000005] slave connect to master 127.0.0.1:2013
    [:01000004] connect from 127.0.0.1:55126 4
    [:01000006] LAUNCH harbor 1 16777221
    [:01000004] Harbor 1 (fd=4) report 127.0.0.1:2526
    [:01000005] Waiting for 0 harbors
    [:01000005] Shakehand ready
    [:01000007] LAUNCH snlua datacenterd
    [:01000008] LAUNCH snlua service_mgr
    [:01000009] LAUNCH snlua main
    ...

第一个启动的服务是 logger ,这个服务在之前已经介绍过了,是用C语言实现的。用来打印日志。

bootstrap 这个配置项关系着 skynet 运行的第二个服务。默认的 bootstrap 配置项为

snlua bootstrap

这意味着,skynet 会启动一个 snlua 沙盒服务,并将 bootstrap 作为参数传给它。

按默认配置,服务会加载 service/bootstrap.lua 作为入口脚本。启动后,这个 snlua 服务同样可以称为 bootstrap 服务。

bootstrap 服务, 会根据配置启动其他系统服务, 其中启动了 launcher 服务。更多细节可以见Bootstrap

最后,它启动了 main 服务。 main.lua 就是业务逻辑的入口。

沙盒启动API

Lua代码里, 启动其他沙盒服务有2个API

  • skynet.launch
  • skynet.newservice

例如,服务 bootstrap 启动服务 launcher

    -- bootstrap.lua
    local launcher = assert(skynet.launch("snlua","launcher"))

代码跟踪:

  • manage.lua @launch
  • lua-skynet.c @lcommand
  • skynet_service.c @skynet_command
  • skynet_service.c @cmd_launch
  • skynet_service.c @skynet_context_new

最终载入了一个 snlua 服务,用 launcher.lua 作为入口脚本。

那么, skynet.newservice 有什么不同那 ?

这个函数跟 launch 的区别是: 通过发送消息给服务 launcher, 由 launcher 来统一启动指定服务。

代码跟踪:

  • skynet.lua @newservice
  • 执行 skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
  • 触发 launcher 执行代码
    • launcher.lua @command.LAUNCH
    • 执行 skynet.launch(service, param)

下面讲沙盒具体的启动过程

snlua 启动

启动服务 launcher 取例

skynet_context_new("snlua", "launcher")

服务的创建函数

struct snlua {
	lua_State * L;
	struct skynet_context * ctx; // 服务的句柄
	size_t mem;         // 当前使用的内存量,单位是byte
	size_t mem_report;  // 每次超过这个值,会产生日志告警
	size_t mem_limit;   // 内存上限
};

struct snlua *
snlua_create(void) {
	struct snlua * l = skynet_malloc(sizeof(*l));
	memset(l,0,sizeof(*l));
	l->mem_report = MEMORY_WARNING_REPORT;
	l->mem_limit = 0;
	l->L = lua_newstate(lalloc, l);
	return l;
}

每一个 snlua 服务都绑定了一个Lua VM。 Lua VM实现是线程安全的。

既然可以限制每个VM的内存,那么应该限制多少?

官方的建议:

玩家代理服务,可以设置上限到 128 M 左右。当然以过往经验,在正常情况通常应保持在 10M 以下。

读者可能还有一个疑问:每个服务一个 Lua VM, 函数字节码在进程里不是有很多份吗?

针对这个问题,云风大牛已经解决了:对Lua源码做了修改,可以支持多个Lua VM 共用函数字节码。

下面,看初始化函数

int
snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
	int sz = strlen(args);
	char * tmp = skynet_malloc(sz);
	memcpy(tmp, args, sz);
	skynet_callback(ctx, l , launch_cb);
	const char * self = skynet_command(ctx, "REG", NULL);
	uint32_t handle_id = strtoul(self+1, NULL, 16);
	skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
	return 0;
}
  • 绑定了这个服务的回调函数是 launch_cb
  • 服务自己发了第一个包给自己, 内容是 "launcher"

消息触发执行 launch_cb

static int
launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
	assert(type == 0 && session == 0);
	struct snlua *l = ud;
	skynet_callback(context, NULL, NULL);
	init_cb(l, context, msg, sz);
    ...
}
  • 先取消了之前的消息处理函数。 这个服务不需要消息处理函数吗? 别着急,答案在下面。
  • init_cb 进行具体的初始化

函数 init_cb

static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
	lua_State *L = l->L;
	l->ctx = ctx;
	// 省略 ...
	lua_pushlightuserdata(L, ctx);
	lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");
	// 省略 ...

	lua_pushcfunction(L, traceback);
	assert(lua_gettop(L) == 1);
	const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
	int r = luaL_loadfile(L,loader);
	if (r != LUA_OK) {
		skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
		report_launcher_error(ctx);
		return 1;
	}
	lua_pushlstring(L, args, sz);
	r = lua_pcall(L,1,0,1);
	if (r != LUA_OK) {
		skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
		report_launcher_error(ctx);
		return 1;
	}
	// 省略 ...
}
  • 设置寄存器的值, register["skynet_context"] = ctx. 这个值之后会被 lua_skynet.c 这个模块访问。
    这样,Lua 库函数就可以使用服务的API了。
  • 加载 loader.lua, 这里参数跟踪是字符串 "launcher", 错误处理函数是 traceback 。
  • loader.lua 功能非常简单,根据配置项 LUA_SERVICE 找到服务 launcher 对应的源码。这里默认是 launcher.lua 。

Lua 加载 lancher.lua 。 最重要的是在Lua代码中注册了服务的消息分发函数

skynet.register_protocol {
	name = "text",
	id = skynet.PTYPE_TEXT,
	unpack = skynet.tostring,
	dispatch = function(session, address , cmd)
		if cmd == "" then
			command.LAUNCHOK(address)
		elseif cmd == "ERROR" then
			command.ERROR(address)
		else
			error ("Invalid text command " .. cmd)
		end
	end,
}

skynet.dispatch("lua", function(session, address, cmd , ...)
	cmd = string.upper(cmd)
	local f = command[cmd]
	if f then
		local ret = f(address, ...)
		if ret ~= NORET then
			skynet.ret(skynet.pack(ret))
		end
	else
		skynet.ret(skynet.pack {"Unknown command"} )
	end
end)

对每一种类型的消息,都需要注册一个Lua 分发函数。

function skynet.register_protocol(class)
	local name = class.name
	local id = class.id
	assert(proto[name] == nil)
	assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
	proto[name] = class
	proto[id] = class
end

那么,服务收到一个消息后,又是如何执行这个Lua 分发函数的?

lancher.lua 最后一行

	skyent.start(function () end)

function skynet.start(start_func)
	c.callback(skynet.dispatch_message)
	-- 这里可以理解为,直接执行初始化函数 start_func
	skynet.timeout(0, function()
		skynet.init_service(start_func)
	end)
end
  • skynet.start 是服务启动的最后一行代码。 调用了 c.callback
  • c.callback 就是 lua-skynet.c@lcallback
  • lcallback调用 skynet_callback。绑定消息回调函数 _cb.
static int
lcallback(lua_State *L) {
	// 取出服务的上下文
	struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
	int forward = lua_toboolean(L, 2);
	luaL_checktype(L,1,LUA_TFUNCTION);
	lua_settop(L,1);
	// 寄存器中保存分发函数,register[&_cb] = skynet.dispatch_message
	lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
	// 取出状态机的主线程,注意:snlua 沙盒是由主线程进行调度
	lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
	// 主线程 
	lua_State *gL = lua_tothread(L,-1);

	// forward 模式下,这个消息处理完,并不释放内存
	if (forward) {
		skynet_callback(context, gL, forward_cb);
	} else {
		skynet_callback(context, gL, _cb);
	}

	return 0;
}
  • 当这个服务收到消息时,会触发 _cb 函数.
  • _cb 函数,驱动Lua虚拟机的主线程,执行 skynet.dispatch_message.
  • skynet.dispatch_message 会调用 skynet.lua@raw_dispatch_message
	-- 代码片段来自 skynet.lua@raw_dispatch_message
	local p = proto[prototype]
	local f = p.dispatch
	-- 针对这个新请求,创建出一个线程。切换协程
	local co = co_create(f)
	session_coroutine_id[co] = session
	session_coroutine_address[co] = source
	suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))

通过 proto,消息就能被之前 skynet.register_protocol 注册的分发函数进行处理了。

总结:

  • skynet.newservice 可以启动一个沙盒服务
  • skynet.register_protocol 或者 skynet.dispatch 可以注册沙盒的消息分发函数

skynet 学习笔记 - 消息队列

介绍

Skynet 是一个为网络游戏服务器设计的轻量框架。

这个游戏框架的特点是:

  • 实现一个类似 Erlang 的 Actor模型的服务端编程环境
  • 运行效率高,追求单机性能
  • 不关注分布式,追求高实时的相应速度
  • 业务层采用 Lua 沙盒,开发调试方便。
  • 适合对C/Lua 熟悉的团队

消息调度机制

Skyent 核心部分是一个消息调度机制。示意图如下

说明:

  • Skynet 是一个独立的进程,其中运行着若干个worker线程。
  • worker 线程会从消息队列中取出消息,找到对应的处理函数,进行分发。
  • timer 线程实现了定时机制。
  • socket 线程主要工作是监听 epoll 事件,管理网络操作。

我会用单独的篇幅来分析各个重点

  • 消息队列
  • 服务
  • 沙盒服务
  • snlua调度
  • 定时器
  • Socket
  • 玩家代理服务

首先是消息队列

消息队列

Skynet 维护了两级消息队列。

  • 每个服务都有一个私有的消息队列。队列中是一个个发送给它的消息。
  • 一个全局消息队列。里面放的是若干个不为空的服务队列。

下面,主要内容是

  • 服务队列的结构
  • 服务队列操作
  • 全局消息队列的结构
  • 全局消息队列操作
  • 工作线程分发消息
  • 参考资料

服务队列的结构

    struct message_queue {
        uint32_t handle;  // 服务地址
        int cap;          // 数组
        int head;         // queue 实现了循环队列。 head 和 tail 分别是头和尾
        int tail;
        struct skynet_message *queue;   // 保存消息的数组. 
        struct message_queue *next;
        ...
    };

由于服务队列是属于服务的,所以服务队列的生命周期和服务一致:载入服务的时候生成,卸载服务的时候删除。

服务是通过 skynet_context_new 载入的,在此函数中,可以找到对应的服务队列的生成语句:

    struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);

    struct message_queue * 
    skynet_mq_create(uint32_t handle) {
        struct message_queue *q = skynet_malloc(sizeof(*q));
        q->handle = handle;
        q->cap = DEFAULT_QUEUE_SIZE;       // 队列大小
        q->head = 0;
        q->tail = 0;
        // ...
        q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);
        q->next = NULL;

        return q;
    }

handle 就是队列所属服务的地址。 通过 handle, 就可以找到服务对应的结构体。

可以看到,queue 是个数组,可以存放 DEFAULT_QUEUE_SIZE 个消息, 默认大小是 1024个。

实际上 queue 是用数组实现了一个循环队列。

服务队列操作

服务队列主要支持2个操作

  • 向服务队列中添加消息
  • 从服务队列中取出消息

添加消息到队列中,如果队列满了,会触发自动扩容的操作 expand_queue . 新扩容的数组大小是原先的2倍。

void 
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
	q->queue[q->tail] = *message;
	if (++ q->tail >= q->cap) {
		q->tail = 0;
	}
	if (q->head == q->tail) {
		expand_queue(q);
	}

	...
}

那么,取出消息后,数组占有空间少的时候,会收缩吗 ? 答案是不。

int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
	// ...
	if (q->head != q->tail) {
		// 这个就是取出的消息
		*message = q->queue[q->head++];
		int head = q->head;
		int tail = q->tail;
		int cap = q->cap;
 		// head 重新指向数组头   
		if (head >= cap) {
			q->head = head = 0;
		}
		int length = tail - head;
		if (length < 0) {
			length += cap;
		}
	} 

	// ...
}

全局消息队列的结构

Skynet 进程只有1个全局消息队列。 在Skynet 启动的时候会进行初始化。

    skynet_mq_init(); 

    struct global_queue {
        struct message_queue *head;      // 指向第一个服务队列
        struct message_queue *tail;      // 指向最后一个服务队列
        struct spinlock lock;            // 并发同步
    };

你可能很好奇, 这个链表,如何指向下一个成员。 其实之前已经提到了

    struct message_queue {
        ...
        struct message_queue *next;
    }

为了效率,并不是简单的把所有的服务队列都塞到全局队列中,而是只塞入非空的服务队列,
这样worker线程就不会得到空的服务队列而浪费CPU。

全局消息队列操作

全局消息队列主要支持2个操作

  • 向全局队列中添加服务队列
  • 从全局队列中取出服务队列

添加服务队列

void 
skynet_globalmq_push(struct message_queue * queue) {
	struct global_queue *q= Q;
	SPIN_LOCK(q)
	assert(queue->next == NULL);
	if(q->tail) {
		q->tail->next = queue;
		q->tail = queue;
	} else {
		q->head = q->tail = queue;
	}
	SPIN_UNLOCK(q)
}

取出服务队列

struct message_queue * 
skynet_globalmq_pop() {
	struct global_queue *q = Q;

	SPIN_LOCK(q)
	struct message_queue *mq = q->head;
	if(mq) {
		q->head = mq->next;
		if(q->head == NULL) {
			assert(mq == q->tail);
			q->tail = NULL;
		}
		mq->next = NULL;
	}
	SPIN_UNLOCK(q)

	return mq;
}

这些操作都使用了锁进行保护。早期版本,采用无锁机制,结果引入了并发的BUG

工作线程分发消息

Skynet 启动多个worker线程进行消息分发,线程个数是可以设置的,官方建议配置为cpu核数。

每个 worker 线程执行的入口函数是 thread_worker

	// skynet_start.c#start 
	for (i=0;i<thread;i++) {
		// ...
		create_thread(&pid[i+3], thread_worker, &wp[i]);
	}

忽略枝叶, thread_worker 会不停地调用 skynet_context_message_dispatch

struct message_queue * 
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
	// 从全局队列中取出一个服务队列
	if (q == NULL) {
		q = skynet_globalmq_pop();
		if (q==NULL)
			return NULL;
	}

	// 找到服务队列所属的服务上下文
	uint32_t handle = skynet_mq_handle(q);
	struct skynet_context * ctx = skynet_handle_grab(handle);
	// ...

	// 为了调度公平,每次只弹出一个消息
	int i,n=1;
	struct skynet_message msg;

	for (i=0;i<n;i++) {
		if (skynet_mq_pop(q,&msg)) {
			skynet_context_release(ctx);
			return skynet_globalmq_pop();
		} else if (i==0 && weight >= 0) {
			n = skynet_mq_length(q);
			n >>= weight;
		}
		// 检查服务是否过载
		int overload = skynet_mq_overload(q);
		if (overload) {
			skynet_error(ctx, "May overload, message queue length = %d", overload);
		}

		skynet_monitor_trigger(sm, msg.source , handle);

		if (ctx->cb == NULL) {
			skynet_free(msg.data);
		} else {
			// 进行消息分发
			dispatch_message(ctx, &msg);
		}

		skynet_monitor_trigger(sm, 0,0);
	}

	// ...

	return q;
}

函数 _dispatch_message 会调用这个服务的 callback 。

static void
_dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
	int type = msg->sz >> HANDLE_REMOTE_SHIFT;
	size_t sz = msg->sz & HANDLE_MASK;
	// ...
	if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz))
		skynet_free(msg->data);
}

那么,如何设置这个 callback ? 下一篇再来回答。

skynet 学习笔记 - 定时器

本章的内容是

  • 设置定时器
  • skynet.timeout
  • 定时器的实现
  • 伪取消定时器

设置定时器

在业务Lua代码里设置定时器的接口是

    -- 参数 ti: number
    -- 参数 func: function
    -- 框架在 ti 个单位时间后,调用 func 这个函数。
    skynet.timeout(ti, func)

定时器实现的非常高效,一般不用太担心性能问题。

如果你的服务想大量使用定时器的话,可以考虑:在一个服务里,只使用一个 skynet.timeout,用它来触发自己的定时事件模块。

skynet.timeout

skynet.timeout 实现

function skynet.timeout(ti, func)
    local session = c.intcommand("TIMEOUT",ti)
    assert(session)
    local co = co_create(func)
    assert(session_id_coroutine[session] == nil)
    session_id_coroutine[session] = co
end
  • c.intcommand("TIMEOUT",ti) 在框架里注册了一个定时事件。事件关联了服务的唯一session
  • 分配一个协程。关联session和协程。
  • 在事件到期后,可以根据session, 找到这个协程,执行回调函数 func

再看看 c.intcommand("TIMEOUT",ti) 的实现

  • c.intcommand 就是 lua-skynet.c@lintcommand
  • lintcommand 调用 skynet_command(context, "TIMEOUT", parm)
  • skynet_command 调用 cmd_timeout

cmd_timeout 实现

static const char *
cmd_timeout(struct skynet_context * context, const char * param) {
	char * session_ptr = NULL;
	int ti = strtol(param, &session_ptr, 10);
	int session = skynet_context_newsession(context);
	skynet_timeout(context->handle, ti, session);
	sprintf(context->result, "%d", session);
	return context->result;
}
  • 向框架申请,分配了这个服务唯一的session
  • skynet_timeout 注册了定时事件。 定时事件里记录了服务的handler, session

skynet 启动后,有1个线程执行

create_thread(&pid[1], thread_timer, m);

忽略很多细节,这个函数主要是不停地调用

	for (;;) {
		skynet_updatetime();
		// ...
	}

忽略细节,随着时间变化,对应的定时器到期,会触发

static inline void
dispatch_list(struct timer_node *current) {
	do {
		struct timer_event * event = (struct timer_event *)(current+1);	//取出event,然后对skynet消息赋值
		struct skynet_message message;
		message.source = 0;
		message.session = event->session;
		message.data = NULL;
		message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;

		skynet_context_push(event->handle, &message);	//将消息压入相应的服务
		
		struct timer_node * temp = current;
		current=current->next;
		skynet_free(temp);	//处理完之后才释放内存
	} while (current);
}

dispatch_list 根据定时事件event 关联的source, session, 发送一个消息给对应的服务。

那么,服务收到后,就可以根据session 找到之前分配的协程了。

定时器的实现

定时器实现方案,从原理来说,非常简单。 伪代码如下

def update_timer():
     current = get_cur_time()
     for timer in reg_timer_list:
         if timer.expeires > current:
             timer.callback()

由时钟来驱动 update_timer。检查到期的定时事件,进行回调。

云大在skynet 里采用了时间轮的实现。按8/6/6/6/6/分成5个部分,也就是有5个时钟,这种分层方法的空间复杂度变为 256+64+64+64+64= 512个槽,支持注册最长时间的tick是 25664646464=2^32。每个ticket 时长是 1/100秒

个人觉得:针对这个精度,时间轮太复杂了,采用更简单的实现,例如优先级队列。

更多关于定时器,可以看我之前的文章: 各种定时器的实现

伪取消定时器

skynet 在机制上不支持取消之前注册的定时器。

当倒计时结束以后,执行的不是调用定时器时注册的回调,而是更改了的回调。
如果这个回调不会做任何事情,这就是伪取消。

实现代码

local function remove_timeout_cb(...)
end

function skynet.remove_timeout(session)
    local co = co_create(remove_timeout_cb)
    assert(session_id_coroutine[session] ~= nil)
    session_id_coroutine[session] = co
end

libphenom 学习笔记

参考资料:

引用计数

源码: include/phenom/refcnt.h

typedef int ph_refcnt_t;
void ph_refcnt_add(ph_refcnt_t *ref)
// Returns true if we just released the final reference
bool ph_refcnt_del(ph_refcnt_t *ref)

引用计数管理对象的生命期

void ph_string_delref(ph_string_t *str)
{
  if (!ph_refcnt_del(&str->ref)) {
    return;
  }

  if (str->mt >= 0) {
    ph_mem_free(str->mt, str->buf);
    str->mt = PH_MEMTYPE_INVALID;
  }
  if (str->slice) {
    ph_string_delref(str->slice);
    str->slice = 0;
  }
  str->buf = 0;
  if (!str->onstack) {
    ph_mem_free(mt_string, str);
  }
}

Counter

源码: corelib/counter.c; include/phenom/counter.h; tests/counter.c; corelib/debug_console.c

计数器。 用来了解事物发生的频率。实际用在memory, job子系统中。

scope就是一系列在逻辑上处于同一组的counter的集合概念。
在使用counter的时候最初就需要创建scope。
在定义scope的时候需要确定该scope内最多能有多少个counter注册进去,这个叫slot。
scope互相之间可以有父子继承关系。

我们要创建block的scenario只有两个:
当你在同一个线程内需要频繁进行计数器更新的时候;
当你在一个线程内对多个计数器进行更新,并期望这个操作尽可能快的时候;

开启debug-console, 可以输出系统的计数。

~$> echo counters | nc -UC /tmp/phenom-debug-console 
                  iosched/dispatched             5144
                  iosched/timer_busy                0
                 iosched/timer_ticks             5035
          memory.ares.channel/allocs                1
           memory.ares.channel/bytes              104
           memory.ares.channel/frees                0
             memory.ares.channel/oom                0

上面的最高层的scope是memory和iosched。 memory的子scope是area, area的子scope是channel.
channel里面有4个slots, 分别记录了4个counter. name分别是alloc, bytes, frees, oom.
对应的counter分别是1, 104, 0, 0.

memory

源码:corelib/memory.c; include/phenom/memory; tests/memory.c; corelib/debug_console.c

基于counter子系统的内存分配器。

通过下面2个函数注册新的memtype。

ph_memtype_t ph_memtype_register(const ph_memtype_def_t *def);
ph_memtype_t ph_memtype_register_block(
    uint8_t num_types,
    const ph_memtype_def_t *defs,
    ph_memtype_t *types);

memtype支持的操作,malloc, realloc, free

void *ph_mem_alloc(ph_memtype_t memtype)
void *ph_mem_alloc_size(ph_memtype_t memtype, uint64_t size)
void *ph_mem_realloc(ph_memtype_t memtype, void *ptr, uint64_t size)
void  ph_mem_free(ph_memtype_t memtype, void *ptr)

通过下面函数就可以了解内存的分配情况

void ph_mem_stat(ph_memtype_t memtype, ph_mem_stats_t *stats);
struct ph_mem_stats {
  /* the definition */
  const ph_memtype_def_t *def;
  /* current amount of allocated memory in bytes */
  uint64_t bytes;
  /* total number of out-of-memory events (allocation failures) */
  uint64_t oom;
  /* total number of successful allocation events */
  uint64_t allocs;
  /* total number of calls to free */
  uint64_t frees;
  /* total number of calls to realloc (that are not themselves
   * equivalent to an alloc or free) */
  uint64_t reallocs;
};

开启debug-console, 可以输出内存使用情况 (非常酷)

$> echo memory | nc -UC /tmp/phenom-debug-console
                WHAT     BYTES       OOM    ALLOCS     FREES   REALLOC
    threadpool/pool       832         0         1         0         0 
 threadpool/ringbuf      8480         0         2         0         0 
    hashtable/table      3136         0         3         0         0 
          hook/hook         8         0         1         0         0 
          hook/head         0         0         0         0         0 
        hook/string        19         0         1         0         0 
         hook/unreg         0         0         0         0         0 
      stream/stream       272         0         2         0         0 
      buffer/object       120         0         3         0         0 
          buffer/8k     16384         0         2         0         0 
         buffer/16k         0         0         0         0         0 
         buffer/32k         0         0         0         0         0 
         buffer/64k         0         0         0         0         0 
       buffer/vsize         0         0         0         0         0 
       buffer/queue        48         0         2         0         0 
   buffer/queue_ent        64         0         2         0         0 

strings

源码: corelib/string.c; include/phenom/string.c; tests/string.c;

设计目标: http://facebook.github.io/libphenom/#string

实现

 typedef struct ph_string ph_string_t;
   
 struct ph_string {
   ph_refcnt_t ref;   // 引用计数
   ph_memtype_t mt;
   uint32_t len, alloc;   // 使用字节数,总字节数 
   char *buf;    // 指向实际的存储
   ph_string_t *slice;
   bool onstack;   // 是否在stack上
 }; 

其中参数mt的值, 用负的表示stack-based growable,正的表示heap-allocated growable

ph_result_t ph_string_append_buf(ph_string_t *str,
const char *buf, uint32_t len)
{
    if (len + str->len > str->alloc) {
    // Not enough room
    if (str->mt == PH_STRING_STATIC) {
      // Just clamp to the available space
      len = str->alloc - str->len;
    } else {
      // Grow it
      uint32_t nsize = ph_power_2(str->len + len);
      char *nbuf;

      // Negative memtypes encode the desired memtype as the negative
      // value.  Allocate a buffer from scratch using the desired memtype
      if (str->mt < 0) {
        nbuf = ph_mem_alloc_size(-str->mt, nsize);
      } else {
        nbuf = ph_mem_realloc(str->mt, str->buf, nsize);
      }

      if (nbuf == NULL) {
        return PH_NOMEM;
      }

      if (str->mt < 0) {
        // Promote from static growable to heap allocated growable
        memcpy(nbuf, str->buf, str->len);
        str->mt = -str->mt;
      }
      str->buf = nbuf;
      str->alloc = nsize;
    }
  }

  memcpy(str->buf + str->len, buf, len);
  str->len += len;
  return PH_OK;
}

slice的创建

ph_string_t *ph_string_make_slice(ph_string_t *str,
    uint32_t start, uint32_t len)
{
  ph_string_t *slice;

  if (start == 0 && len == str->len) {
    ph_string_addref(str);
    return str;
  }

  slice = ph_mem_alloc(mt_string);
  if (!slice) {
    return NULL;
  }

  ph_string_init_slice(slice, str, start, len);
  return slice;
}

子模块的初始化

例如 memory.c 里有以下指令

PH_LIBRARY_INIT_PRI(memory_init, memory_destroy, 3)

include/phenom/defs.h定义

void ph_library_init_register(struct ph_library_init_entry *ent);
#define PH_LIBRARY_INIT_PRI(initfn, finifn, pri) \
  static __attribute__((constructor)) \
    void ph_defs_gen_symbol(ph__lib__init__)(void) { \
        static struct ph_library_init_entry ent = { \
              __FILE__, __LINE__, pri, initfn, finifn, 0 \
                };
        ph_library_init_register(&ent); \
}

attribute((constructor)), 使的函数体在main开始运行前,自动调用;
具体见 http://gcc.gnu.org/onlinedocs/gcc/Function-Attributes.html;
所以meory_init, memory_destroy被注册

每1个使用libphenom的程序都要求,先调用ph_library_init,每1个注册init函数都被执行。

for (i = 0; i < num_init_ents; i++) {
    struct ph_library_init_entry *ent = init_funcs[i];
    if (ent->init) {
      ent->init();
    }
}

Stream

源码: 目录 corelib/streams; include/phenom/stream.h; tests/stream.c

libPhenom provides a portable layer over streaming IO.
CSAPP解释了标准IO为什么不能使用在socket上。
stream支持socket, ssl, fd, string.

实现

/** Represents a stream
 *
 * Streams maintain a buffer for read/write operations.
 */
struct ph_stream {
  const struct ph_stream_funcs *funcs;
  void *cookie;
  unsigned flags;
  pthread_mutex_t lock;
  // if data is in the read buffer, these are non-NULL
  unsigned char *rpos, *rend;
  // if data is in the write buffer, these are non-NULL
  unsigned char *wpos, *wend;
  unsigned char *wbase;
  // associated buffer.  It can be either used in read mode
  // or write mode, but not both
  unsigned char *buf;
  uint32_t bufsize;
  int last_err;
  ph_iomask_t need_mask;
};
/** Defines a stream implementation.
 *
 * If any of these return false, it indicates an error.
 * The implementation must set stm->last_err to the corresponding
 * errno value in that case (and only in the failure case).
 */
struct ph_stream_funcs {
  bool (*close)(ph_stream_t *stm);
  bool (*readv)(ph_stream_t *stm, const struct iovec *iov,
      int iovcnt, uint64_t *nread);
  bool (*writev)(ph_stream_t *stm, const struct iovec *iov,
      int iovcnt, uint64_t *nwrote);
  bool (*seek)(ph_stream_t *stm, int64_t delta,
      int whence, uint64_t *newpos);
};

读写公用1个缓存区。 通过定义 struct ph_stream_funcs, 来支持不同流类型。

buffers

源码: corelib/buf.c; include/phenom/buffer.h; tests/buf.c

设计目标: http://facebook.github.io/libphenom/index.html#buffer;

ph_buf_t作为ph_bufq_t的底层实现,并没有单独使用

struct ph_buf {
  ph_refcnt_t ref;
  ph_buf_t *slice;
  uint8_t *buf;
  uint64_t size;
  ph_memtype_t memtype;
};

ph_buf_t *ph_buf_new(uint64_t size);
ph_buf_t *ph_buf_slice(ph_buf_t *buf, uint64_t start, uint64_t len);

ph_buf_new 创建1个新的buffer, 新的buffer的大小,调用函数select_size。主要分为8192,16k, 32k等。
ph_buf_slice 创建1个slice, slice实际上没有分配内存。
特殊情况:start=0, len等于buf的长度,只是ph_buf_addref(buf).

buf子系统中不同的内存分配,都分别区分开。

static ph_memtype_def_t defs[] = {
  { "buffer", "object", sizeof(ph_buf_t), PH_MEM_FLAGS_ZERO },
  { "buffer", "8k", 8*1024, 0 },
  { "buffer", "16k", 16*1024, 0 },
  { "buffer", "32k", 32*1024, 0 },
  { "buffer", "64k", 64*1024, 0 },
  { "buffer", "vsize", 0, 0 },
  { "buffer", "queue", sizeof(ph_bufq_t), PH_MEM_FLAGS_ZERO },
  { "buffer", "queue_ent", sizeof(struct ph_bufq_ent), PH_MEM_FLAGS_ZERO },
};

ph_bufq_t,用作socket的用户层buffer。

struct ph_bufq_ent {
  PH_STAILQ_ENTRY(ph_bufq_ent) ent;
  ph_buf_t *buf;
  // Offset into the buf of the data that is yet to be consumed
  uint64_t rpos;
  // Offset at which to append further data
  uint64_t wpos;
};
struct ph_bufq {
  PH_STAILQ_HEAD(bufqhead, ph_bufq_ent) fifo;
  // Maximum amount of storage to allow
  uint64_t max_size;   // 现在好像没有用? 20131114
};

ph_bufq_t *ph_bufq_new(uint64_t max_size);
ph_result_t ph_bufq_append(ph_bufq_t *q, const void *buf, uint64_t len,
    uint64_t *added_bytes);
ph_buf_t *ph_bufq_consume_bytes(ph_bufq_t *q, uint64_t len);
ph_buf_t *ph_bufq_consume_record(ph_bufq_t *q, const char *delim,
    uint32_t delim_len);

ph_bufq_new 创建出1个定长buffer的fifo. 默认会在fifo里放1个8192长度的buffer.
ph_bufq_append 对ph_bufq_t插入数据. 如果最后1个buffer容量不够,就会创建出1个新的buffer, 放到fifo里.
ph_bufq_consume_bytes 从ph_bufq_t读出数据。gc_bufq用来释放资源。 返回的ph_buf_t是重新创建的。
ph_bufq_consume_record 读取数据到指定的record. 例如读取到"\r\n". 调用函数find_record,需要很有耐心的实现。

Json

源码:include/phenom/json.h; 目录 corelib/variant/

提供了json的encoding,decoding的功能。

Configuration

源码: phenom/configuration.h; corelib/config.c

程序启动时有全局的配置文件(json格式), 修改程序一些行为。
该文件可以通过ph_config_load_config_file或者 getenv("PHENOM_CONFIG_FILE")来指定。
例如job.c 里, 可以设置下面的参数来指定sleep时间

int max_sleep = ph_config_query_int("$.nbio.max_sleep", 5000);

建议应用自己的配置在路径 "$.app."下

timer wheel

源码: include/phenom/timerwheel.h, corelib/timerwheel.c

timer wheel, 是一种定时器实现机制。概念来自"Hashed and Hierarchical Timing Wheels".
用来管理大量的定时器。Linux内核中也用这种实现。

Alt text

定时轮的工作原理可以类比于时钟,如上图; 指针按某一个方向按固定频率轮动,每一次跳动称为一个tick。
这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)
以及 timeUnit(时间单位),例如 当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。

实现

PH_LIST_HEAD(            // 双向的循环链表head, 具体见phenom/queue.h
    ph_timerwheel_list,
    ph_timerwheel_timer);

struct ph_timerwheel_timer {
  PH_LIST_ENTRY(ph_timerwheel_timer) t;
  struct ph_timerwheel_list *list;
  struct timeval due;
  int enable;
  #define PH_TIMER_DISABLED    0
  #define PH_TIMER_ENABLED     1
  #define PH_TIMER_LOCKED      2
};

#define PHENOM_WHEEL_BITS 8
#define PHENOM_WHEEL_SIZE (1 << PHENOM_WHEEL_BITS)     // 256

struct ph_timerwheel {
  struct timeval next_run;   // 下1个tick的实际时间
  uint32_t tick_resolution;  // 每个tick的时间间隔
  ck_rwlock_t lock;
  struct {
    struct ph_timerwheel_list lists[PHENOM_WHEEL_SIZE];
  } buckets[4];
};

Alt text

Bucket 0 represents those events that are due the soonest.
Each tick causes us to look at the next list in a bucket.
The 0th list in a bucket is special; it means that it is time to
flush the timers from the next higher bucket and schedule them
into a different bucket.

ph_timerwheel提供了4个buckets, buckets存在着类似时分秒的进位关系;
下面用TV1标识buckets[0], 以此类推, TV4标识buckets[3];

TV1为第1个表,所表示的计时是 1 ~ 255 tick.
因为在一个tick上可能同时有多个timer等待超时处理,
使用ph_timerwheel_list将所有timer 串成一个链表,以便在超时时顺序处理;

TV2为第2个表, 所表示的计时是 256 ~ 65535 tick.
以此类推TV3, TV4;

在nbio子系统中,tick_resolution=100ms,每过100ms, 每1个事件循环会触发ph_timerwheel_tick函数。
用来处理下一个tick所在的所有timer.

ph_timerwheel_tick(ph_timerwheel_t *wheel,
    struct timeval now, 
    ph_timerwheel_should_dispatch_func_t should_dispatch, 
    ph_timerwheel_dispatch_func_t dispatch,  
    void *arg) 

idx 是用来遍历 TV1 的索引。每一次循环idx会定位一个当前待处理的 tick,并处理这个tick下所有超时的timer。
wheel->next_run会在每次循环后增加一个 tick_resolution,index也会随之向前移动。当index变为0时表示TV1完成了一次完整的遍历,
此时所有在 TV1 中的 timer 都被处理了,因此需要通过 cascade 将后面 TV2,TV3 等 timer list 中的timer向前移动,类似于分转成秒的操作。
这种层叠的 timer list 实现机制可以大大降低每次检查超时, timer的时间,每次中断只需要针对 TV1 进行检查,只有必要时才进行cascade。

timer wheel一个弊端就是 cascade 开销过大。 在极端的条件下,同时会有多个TV需要进行cascade处理,会产生很大的时延。
这也是为什么说timeout类型的定时器是timer wheel 的主要应用环境,或者说timer wheel 是为 timeout 类型的定时器优化的。
因为timeout类型的定时器的应用场景多是错误条件的检测,这类错误发生的机率很小,通常不到超时就被删除了,因此不会产生cascade的开销。

nbio子系统,
初始化过程,ph_nbio_init--> ph_timerwheel_init(&emitters[i].wheel, me->now, WHEEL_INTERVAL_MS);
函数ph_nbio_emitter_init中, 每1个emitter,创建1个timerfd,100ms后,定时器超时,timefd成为可读,触发回调函数tick_epoll;

  emitter->timer_fd = timerfd_create(
  CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
  if (emitter->timer_fd == -1) {
    ph_panic("timerfd_create(CLOCK_MONOTONIC) failed: `Pe%d", errno);
  }

  memset(&ts, 0, sizeof(ts));
  ts.it_interval.tv_nsec = WHEEL_INTERVAL_MS * 1000000;
  ts.it_value.tv_nsec = ts.it_interval.tv_nsec;
  timerfd_settime(emitter->timer_fd, 0, &ts, NULL);

  ph_job_init(&emitter->timer_job);
  emitter->timer_job.callback = tick_epoll;
  emitter->timer_job.fd = emitter->timer_fd;
  emitter->timer_job.data = emitter;
  emitter->timer_job.emitter_affinity = emitter->emitter_id;
  ph_job_set_nbio(&emitter->timer_job, PH_IOMASK_READ, 0);

调用顺序: ph_nbio_emitter_init -> ph_job_set_nbio -> tick_epoll -> ph_nbio_emitter_timer_tick -> ph_timerwheel_tick

hash table

源码: include/phenom/hashtable.h;corelib/hash; tests/hashtable.c;

struct ph_ht {
  uint32_t nelems;
  uint64_t table_size, elem_size, mask;
  const struct ph_ht_key_def *kdef;
  const struct ph_ht_val_def *vdef;
  /* points to the table, an array of table_size elements */
  char *table;
};
  
ph_result_t ph_ht_init(ph_ht_t *ht, uint32_t size_hint,
  const struct ph_ht_key_def *kdef,
  const struct ph_ht_val_def *vdef)
{
  ht->kdef = kdef;
  ht->vdef = vdef;
  ht->nelems = 0;
  ht->table_size = ph_power_2(size_hint * 2);
  ht->elem_size = sizeof(struct ph_ht_elem) + kdef->ksize + vdef->vsize;
  ht->mask = ht->table_size - 1;
  ht->table = ph_mem_alloc_size(mt_table, ht->elem_size * ht->table_size);
  if (!ht->table) {
    return PH_NOMEM;
  }

  return PH_OK;
}

采用的是linear probing的实现。 hash桶的大小在ph_ht_init的时候传入。
如果桶满了, insert就会失败。 需要显性地调用。
ph_ht_grow来手动建立hash表, 没有rehash的过程。
ph_hash_bytes_murmur函数实现了Murmur Hash算法。

phenom线程

源码: include/phenom/thread.h; corelib/thread.c

struct ph_thread {
  bool refresh_time;
  // internal monotonic thread id
  uint32_t tid;

  PH_STAILQ_HEAD(pdisp, ph_job) pending_nbio, pending_pool;
  struct ph_nbio_emitter *is_emitter;

  int is_worker;
  struct timeval now;

  ck_epoch_record_t epoch_record;
  ck_hs_t counter_hs;
  // linkage so that a stat reader can find all counters
  ck_stack_entry_t thread_linkage;

  // OS level representation
  pthread_t thr;

  // If part of a pool, linkage in that pool
  CK_LIST_ENTRY(ph_thread) pool_ent;

  pid_t lwpid;

#ifdef HAVE_STRERROR_R
  char strerror_buf[128];
#endif

  // Name for debugging purposes
  char name[16];
};

Phenom线程上记录了

  • pending NBIO job 队列
  • pending pool job 队列
  • pthread_t 线程id
  • 在pool中的结点
  • name

每个phenom线程分配一个全局唯一的id,对应一个pthread线程。 如注释所说,tid < MAX_RINGS的phenom线程称为preferred thread, 拥有自己专用的job队列,其他线程竞争共享队列,用spinlock同步。

全局的pools将所有线程池保存在链表中。其中包含用于consumer和producer等待/唤醒的结构(futex或condition variable), 保存job的ring buffer、worker线程的指针等等信息。

ph_thread_spawn(func, arg)创建一个ph_thread_t线程。 实际上是调用pthread_create(),让其执行ph_thread_boot(),将实际要执行的函数func() 和参数arg等信息传入。ph_thread_boot()会分配内存并创建一个新的ph_thread_t结构, 执行一些初始化,然后调用传入的那个func()。

此外,封装了join、self、setaffinity等等pthread操作。

pthread_key_t

1个进程中线程直接除了线程自己的栈和寄存器之外,其他几乎都是共享的,如果线程想维护一个只属于线程自己的全局变量怎么办?
线程的私有存储解决了这个问题。

  • 创建一个类型为 pthread_key_t 类型的变量。
  • 调用 pthread_key_create() 来创建该变量。该函数有两个参数,第一个参数就是上面声明的 pthread_key_t 变量,
    第二个参数是一个清理函数,用来在线程释放该线程存储的时候被调用。该函数指针可以设成 NULL ,
    这样系统将调用默认的清理函数。当线程中需要存储特殊值的时候,可以调用 pthread_setspecific() 。
    该函数有两个参数,第一个为前面声明的 pthread_key_t 变量,第二个为 void* 变量,这样你可以存储任何类型的值。
  • 如果需要取出所存储的值,调用pthread_getspecific() 。该函数的参数为前面提到的 pthread_key_t 变量,
    该函数返回 void *类型的值。 pthread_key_t无论是哪一个线程创建,其他所有的线程都是可见的,
    即一个进程中只需phread_key_create()一次。看似是全局变量,然而全局的只是key值,
    对于不同的线程对应的value值是不同的(通过pthread_setspcific()和pthread_getspecific()设置)。

ph_thread_self函数就用这个方式取得线程自己的句柄

JOB

job有3类

  • Immediate. The work is dispatched immediately on the calling thread. 接口 ph_job_dispatch_now
  • NBIO. The work is dispatched when a descriptor is signalled for I/O.
  • Pool. The work is queued to a thread pool and is dispatched as soon as a worker becomes available.
    libPhenom allows multiple pools to be defined to better partition and prioritize your workload

NBIO

源码

  • include/phenom/job.h
  • tests/bench/iopipes.c
  • tests/iobasic.c
  • tests/timer.c
  • 目录corelib/nbio

ph_nbio_init()初始化NBIO。

  • calloc()分配num_schedulers个emitter, 定义在struct ph_nbio_emitter, 每个emitter会跟1个事件循环,和1个thread绑定。
  • 初始化每个emitter及其timer wheel(ph_timerwheel_init()、ph_nbio_emitter_init())
    • timer_fd描述符(timerfd_create())
    • timer_job扔进pending_nbio队列,这个job被调度到时执行tick_epoll
  • 初始化counter, 用来进行统计, 可以用ph_nbio_stat进行观察

每个emitter绑定了1个事件循环

    struct ph_nbio_emitter {
      ph_timerwheel_t wheel;    // 时间轮
      ph_job_t timer_job;
      uint32_t emitter_id;
      struct timeval last_dispatch;
      int io_fd, timer_fd;
      ph_nbio_affine_job_stailq_t affine_jobs;   // typedef PH_STAILQ_HEAD(affine_ent, ph_nbio_affine_job)
      ph_job_t affine_job;
      ph_pingfd_t affine_ping;    // 用来唤醒epoll
      ph_thread_t *thread;        // 跟thread绑定在一起
      ph_counter_block_t *cblock;   // 计数器
    };
    
    struct ph_job {
      // data associated with job
      void *data;
      // the callback to run when the job is dispatched
      ph_job_func_t callback;
      // deferred apply list
      PH_STAILQ_ENTRY(ph_job) q_ent;
      // whether we're in a deferred apply
      bool in_apply;
      // for PH_RUNCLASS_NBIO, trigger mask */
      ph_iomask_t mask;
      // use ph_job_get_kmask() to interpret
      int kmask;
      // Hashed over the scheduler threads; two jobs with
      // the same emitter hash will run serially wrt. each other
      uint32_t emitter_affinity;
      // For nbio, the socket we're bound to for IO events
      ph_socket_t fd;
      // Holds timeout state
      struct ph_timerwheel_timer timer;
      // When targeting a thread pool, which pool
      ph_thread_pool_t *pool;
      // for SMR
      ck_epoch_entry_t epoch_entry;
      struct ph_job_def *def;
    };

ph_sched_run调度NBIO

  • 对emitters中的每个线程执行sched_loop(),emitters[0]是ph_sched_run的调用者;
  • sched_loop 调用 ph_nbio_emitter_run
  • ph_nbio_emitter_run进入Reactore模式,epoll_wait得到fd, ph_job_t *job = event[i].data.ptr;
  • ph_nbio_emitter_dispatch_immediate回调job之前设置的callback
  • 定时器任务通过timefd来触发

job加入NBIO

  • 通过ph_job_set_nbio加入JOB (ph_job_set_nbio_timeout_in实际调用ph_job_set_nbio

    • 如果当前me.is_worker == 0, 执行ph_nbio_emitter_apply_io_mask,对相关的fd进行epoll_wait
    • 否则放入队列me->pending_nbio
  • 放入pending_nbio队列的job, 通过ph_sched_run --> process_deferred --> ph_nbio_emitter_apply_io_mask;
    加入事件循环中

  • sched_run开始后,新加入job; ph_nbio_emitter_run --> ph_job_pool_apply_deferred_items --> process_deferred

Thread Pool

源码: include/phenom/thread.h; corelib/job.h; corelib/job.c; tests/tpool.c

    struct ph_thread_pool {
      struct ph_thread_pool_wait consumer CK_CC_CACHELINE;

      uint32_t max_queue_len;

      ck_ring_t *rings[MAX_RINGS+1];
      intptr_t used_rings;

      ck_spinlock_t lock CK_CC_CACHELINE;
      char pad1[CK_MD_CACHELINE - sizeof(ck_spinlock_t)];

      struct ph_thread_pool_wait producer CK_CC_CACHELINE;
      int stop;

      char *name;
      ph_counter_scope_t *counters;
      CK_LIST_ENTRY(ph_thread_pool) plink;
      ph_thread_t **threads;

      uint32_t max_workers;
      uint32_t num_workers;

      ph_variant_t *config;
    };

job分发的过程

  • ph_thread_pool_define 定义1个pool;
  • 函数ph_job_set_pool; job->pool = pool; 关联job和pool; PH_STAILQ_INSERT_TAIL(&me->pending_pool, job, q_ent); 放入当前线程的队列
  • 执行 ph_sched_run --> process_deferred --> _ph_job_set_pool_immediate --> do_set_pool
    • tid < MAX_RINGS有自己单独的ring, 其他的共享1个ring.
    • wake_pool(&pool->consumer); 通知worker线程。

job的处理

  • ph_sched_run --> _ph_job_pool_start_threads --> ph_thread_pool_start_workers --> worker_thread

ph_thread_pool_signal_stop函数用来终止

socket

源码:

  • include/phenom/socket.h
  • 目录 corelib/net
  • tests/sockaddr.c

libphenom对socket io进行了封装。包括描述符ph_socket_t, 通用的地址结构phenom_sockaddr,
ph_sock_t封装了读写buffer、用于NBIO的job结构、超时时长、事件发生后的callback等信息。
ph_sock_t由NBIO pool管理。

解析域名并发起连接的过程:

 struct resolve_and_connect {
   ph_sockaddr_t addr;
   ph_socket_t s;
   int resolve_status;
   int connect_status;
   uint16_t port;
   struct timeval start, timeout, elapsed;
   void *arg;
   ph_sock_connect_func func;
};  

def ph_sock_resolve_and_connect(name, port, timeout, resolver, func,, args):
    rac = ph_mem_alloc(mt.resolve_and_connect)
    rac.func = func;
    rac.arg = arg;
    rac.start = ph_time_now();
    rac.port = port;
    if timeout:
      rac.timeout = timeout 
    else:
      rac.timeout = 60  # 默认60s超时
    if ph_sockaddr_set_v4(rac.addr, name, port) == PH_OK:  # 如果name是IP地址
      attempt_connect(rac)
      return
    # 根据resolver采用不同的解析域名的方式,
    rac.addr = dns_getaddrinfo(resolver)
    attempt_connect(rac)
    
def attempt_connect(rac):
    # 建立socket对象
    rac.s = ph_socket_for_addr(rac.addr, SOCK_STREAM, PH_SOCK_CLOEXEC|PH_SOCK_NONBLOCK) 
    ph_socket_connect(rac.s, rac.addr, rac.timeout, connected_sock, rac)
    
  
struct connect_job {
   ph_job_t job;
   ph_socket_t s;
   ph_sockaddr_t addr;
   int status;
   struct timeval start; 
   void *arg;
   ph_socket_connect_func func;
};

def ph_socket_connect(s, addr, timeout, func, arg):
    # connect_job_template = { callback = connect_complete, memtype = mt.connect_job}
    job = (struct connect_job*)ph_job_alloc(connect_job_template)
    
    job.s, job.addr, job.func, job.arg = s, addr, func, arg

    job.start = ph_time_now();
    res = connect(s, job.addr ...)   # man 2 connect
    if (...)  # 
      # 如果s对应fd是异步方式,使用事件回调机制, 回调函数是connect_complete
      job.job.fd = s
      job.job.callback = connect_complete
      job.job.data = job
      ph_job_set_nbio_timeout_in(&job->job, PH_IOMASK_WRITE,
          timeout ? *timeout : default_timeout);
      return;
 
    # 同步IO, 直接调用connected_sock
    done = job.stat  - now
    func(s, addr, res == 0 ? 0 : errno, done, arg);
  
def connect_complete(ph_job_t *j, ph_iomask_t why, void *data):
    struct connect_job *job = data
    if why == PH_IOMASK_TIME:
      status = ETIMEDOUT
    # 回调之前注册的函数, connected_sock
    job.func(job.s, job.addr, status, done, job.arg)
    

def connected_sock(s, addr, status, elapsed, arg):
    struct resolve_and_connect *rac = arg;
    sock = ph_sock_new_from_socket(s, NULL, addr)
    calc_elapsed(rac)
    # 回调用户定义的函数 , 类型是ph_sock_connect_func
    rac.func(sock, PH_SOCK_CONNECT_SUCCESS, 0, addr,  rac.elapsed, rac.arg);

ph_sock_t, 对1个socket连接的抽象:

struct ph_sock {
  // Embedded job so we can participate in NBIO
  ph_job_t job;

  // Buffers for output, input
  ph_bufq_t *wbuf, *rbuf;

  // The per IO operation timeout duration
  struct timeval timeout_duration;

  // A stream for writing to the underlying connection
  ph_stream_t *conn;
  // A stream representation of myself.  Writing bytes into the
  // stream causes the data to be buffered in wbuf
  ph_stream_t *stream;

  // Dispatcher
  ph_sock_func callback;
  bool enabled;

  // sockname, peername as seen from this host.
  // These correspond to the raw connection we see; if we are
  // proxied, these are the names of our connection to the proxy.
  // If we are not proxied, these are the same as the equivalents below
  ph_sockaddr_t via_sockname, via_peername;
  // sockname, peername as seen from the connected peer
  // These are the actual outgoing address endpoints, independent of
  // any proxying that may be employed
  ph_sockaddr_t sockname, peername;

  // If we've switched up to SSL, holds our SSL context
  SSL *ssl;
  ph_stream_t *ssl_stream;
  ph_sock_openssl_handshake_func handshake_cb;
  ph_bufq_t *sslwbuf;
};

// 创建ph_sock_t
// connected_sock, accept_dispatch函数调用
def ph_sock_new_from_socket(ph_socket_t s, ph_sockaddr_t *sockname, ph_sockaddr_t *peername):
    # sock_job_template = {sock_dispatch, mt.sock}, 分配的结构体是ph_sock_t
    sock = (ph_sock_t*)ph_job_alloc(&sock_job_template)
    
    # 读写buf默认大小为128k
    max_buf = ph_config_query_int("$.socket.max_buffer_size", MAX_SOCK_BUFFER_SIZE)
    sock->wbuf = ph_bufq_new(max_buf)
    sock->rbuf = ph_bufq_new(max_buf)
    
    sock->conn = ph_stm_fd_open(s, 0, 0)
    sock->stream = ph_stm_make(&sock_stm_funcs, sock, 0, 0)
    # sockname记录本地地址, peer记录对端地址
    sock->sockname = *sockname
    sock->peername = *peername
    # 默认60s超时
    sock->timeout_duration.tv_sec = 60
    
    return sock

// 加入nbio的方式, 以ph_sock_connect_func回调取例
  def connect_cb(ph_sock_t *sock, ...):
      # 设置回调函数, 并开启
      sock->callback = remote_cb
      ph_sock_enable(sock, true);
      
// 当ph_sock对应的fd有event发生时,nbio回调的入口函数是
def sock_dispatch(j, why, data):
    # SSL暂时不关心, 先skip这些代码
    ph_sock_t *sock = (ph_sock_t*)j;
    sock->conn->need_mask = 0;
    
    // 把wbuf里缓存的数据写入fd
    try_send(sock)
    
    // 从系统中读取数据到rbuf
    try_read(sock)
    
    // 设置对应的mask, 回调用户注册的函数
    // ....
    sock->callback(sock, why, data);
      
// 释放ph_socket_t,当发现需要关闭连接时
  ph_sock_shutdown(sock, PH_SOCK_SHUT_RDWR);
  //  如果sock->job.data之前有malloc数据,这里需要释放
  ph_mem_free(mt_state, state);   
  ph_sock_free(sock);

sock的读写:

  • ph_sock_new_from_socket调用ph_stm_fd_open, ph_stm_make管理fd到sock.conn
  • sock_stm_funcs, 定义了socket的读写操作
  • 读sock, 实际上读sock->rbuf,具体见sock_stm_readv
  • 写sock, 实际上写sock->wbuf, 具体见sock_stm_writev
  • 实际读写对应的fd, 是在sock_dispatch中进行的。 所以如果需要立刻发送数据出去,调用ph_sock_wakeup

skynet 学习笔记 - snlua调度

上文详细讲了沙盒服务 snlua 的启动过程。本文内容是snlua 服务收到消息后,如何进行调度。

协程调度

snlua 服务通过 skynet.start 进行启动。会触发绑定C消息回调函数

// 来自lua-skynet.c
static int
lcallback(lua_State *L) {
    // 省略...
    lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
    lua_State *gL = lua_tothread(L,-1);
    skynet_callback(context, gL, _cb);
}

注意: callback 绑定的ud 是Lua VM的主协程。
因此,服务收到的消息,会驱动主协程执行 skynet.raw_dispatch_message.

那么,Lua VM 只用一个协程,来处理所有的消息吗?

不是的。实际上新的请求会对应一个协程。 协程创建的代码是

-- 来自 skynet.lua
local coroutine_pool = setmetatable({}, { __mode = "kv" })

local function co_create(f)
	local co = table.remove(coroutine_pool)
	if co == nil then
		co = coroutine.create(function(...)
			f(...)
			while true do
				f = nil
				coroutine_pool[#coroutine_pool+1] = co
				f = coroutine_yield "EXIT"
				f(coroutine_yield())
			end
		end)
	else
		coroutine_resume(co, f)
	end
	return co
end

为提高性能,采用了协程池。每个新请求,先从协程池中取,如果没有,就重新创建一个。处理完请求后,重新放入协程池。

请求回应模式

在实际使用 skynet 时,你可以直接使用 rpc 的语法,向外部服务发起一个远程调用,等对方发送了回应消息后,逻辑接着走下去。

那么,框架是如何把回调函数的模式转换为阻塞 API 调用的形式呢?

用官方的例子来解释

-- 来自 skynet/example/agent.lua
	local r = skynet.call("SIMPLEDB", "lua", "get", self.what)

每一个客户端连接上skynet后,会有一个对应的服务 agent.

服务 simpledb 收到 agent 消息后,执行过程如下

    -- 主协程执行
    raw_dispatch_message(prototype, msg, sz, session, source)
        -- 请求类型 prototype ~= PTYPE_RESPONSE。执行新请求分支
        local p = proto[prototype]
        -- 这里对应 simpledb.lua 注册的分发函数
        local f = p.dispatch
        -- 假设刚开始池子是空的。创建一个新的
        local co = co_create(f)
        -- 记录session 和消息发来的服务地址
		session_coroutine_id[co] = session
		session_coroutine_address[co] = source
        -- 协程切换, 子协程开始执行
		suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))

    -- 子协程执行
    -- 入口函数是 co_create 定义的传入 coroutine.create 的闭包
        -- 先执行分发函数, 定义在 simpledb.lua 里
        f(...)
            local f = command[string.upper(cmd)]
            -- f 这里对应于 command.GET
            skynet.ret(skynet.pack(f(...)))
                -- 协程切换,回主协程
                return coroutine_yield("RETURN", msg, sz)
            

    -- 主协程执行
    suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
    -- suspend(co, true, "RETURN", msg, sz)
        -- 进入 command == "RETURN" 分支
        -- 回复消息
        ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil
        -- 协程切换,子协程开始执行
        return suspend(co, coroutine_resume(co, ret))

    -- 子协程执行
    skynet.ret(skynet.pack(f(...)))
    -- ret 正常返回
    -- 分发函数正常返回, f(...)
    -- 重新返回 co_create 定义的闭包
    -- 这个协程被放入池中,可以被复用       
    coroutine_pool[#coroutine_pool+1] = co
    -- 协程切换,返回主协程
    f = coroutine_yield "EXIT"

    -- 主协程
    -- command == "RETURN" 分支
    return suspend(co, coroutine_resume(co, ret))  -- suspend(co, true, "EXIT")
    -- 这时候调用栈  raw_dispatch_message --> suspend --> suspend
    -- "EXIT" 做清理工作,内层 suspend 返回
    -- 从 raw_dispatch_message 正常返回,主协程退出

上面子协程是新创建的,如果是复用协程

    -- 主协程执行
    raw_dispatch_message(prototype, msg, sz, session, source)
        -- 假设刚开始池子是空的。创建一个新的
        local co = co_create(f)
            -- 返回的不是nil,说明协程池中还有协程可以复用。
            local co = table.remove(coroutine_pool)
            -- 进入 else 分支
            -- 切换协程
            coroutine_resume(co, f)
    
    -- 子协程执行
    -- 之前挂起在这个地方,f 就是对应的分发函数
    f = coroutine_yield "EXIT"
    -- 协程切换,返回主协程
    f(coroutine_yield())


    -- 主协程
    -- 从这个函数挂起的地方返回
    local co = co_create(f)
    -- 协程切换, 子协程开始执行
	suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
    
    -- 子协程
    -- 从这里返回。等同于  f(...)
    f(coroutine_yield())
    -- 下面调度过程跟新创建的情况一样
    -- 执行完后,最终会回收此协程,并调用 coroutine_yield “EXIT”等待下一次的复用

服务 agent 执行过程

    -- 子协程
    skynet.call("SIMPLEDB", "lua", "get", self.what)
        -- 发送消息,得到唯一的session id
        local session = c.send(addr, p.id , nil , p.pack(...))
        -- 执行 yield_call
        return p.unpack(yield_call(addr, session))
            -- 这里当前子线程先挂起,等待收到回包。具体细节再看下面
            local succ, msg, sz = coroutine_yield("CALL", session)

    -- 主协程执行
    suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
    -- suspend(co, true, "CALL", session)
    -- 进入 "CALL" 分支。关联 session --> co
    -- 主协程从 raw_dispatch_message 正常返回
    
    -- 当收到回复包的时候,主协程执行
    -- raw_dispatch_message 进入 PTYPE_RESPONSE 分支
    -- 协程切换,执行之前挂起的子协程
    local co = session_id_coroutine[session]
    suspend(co, coroutine_resume(co, true, msg, sz))

    -- 子协程
    local succ, msg, sz = coroutine_yield("CALL", session)
    -- skynet.call 返回,一次RPC调用已经完成
    

总结:

  • 对于 skynet.call ,其实是生成了一个对当前服务来说唯一的 session 号。
  • 调用 yield 给框架发送 "CALL" 这个指令,并自动挂起,切换控制权到框架。
  • 框架中的 suspend 捕获到 "CALL" 后,就会把 session 和 coroutine 对象记录在表中,结束当前的回调函数。
  • 当收到回应消息时,框架会根据 session 号找到之前记录的 coroutine 对象。然后 resume 之前没有做完的业务即可。
  • 从应用层角度看起来,就只是一次阻塞调用而已。

用户级线程

skynet 框架下不能直接使用 Lua coroutine库。

如果你需要创建用户级线程,可以使用 skynet.fork ,skynet.wait,skynet.wakeup。

如果你有其它原因想使用 coroutine, 请考虑清楚为什么需要,然后参考这篇文章Skynet Coroutinue.

参考资料

skynet 学习笔记 - socket

skynet 网络层

skynet 核心机制,是没有网络层的。对网络层的处理,是由单独的线程来完成的。

架构图

示意图

  • 运行服务的线程,把网络请求,打包成统一请求格式,通过管道进行发送。
  • 多线程写管道,没有超过系统的限制,是原子性的。不需要加锁。
  • socket 线程会不停地执行函数 skynet_socket_poll, 顺序执行以下
    • ctrl_cmd 从管道读取网络操作请求,并调用对应的socket 接口
    • 不停地处理事件队列的事件,转成处理结果
    • 通过函数 forward_message,投递结果到,发送请求的服务的消息队列里

核心数据结构

网络功能的核心结构

struct socket_server {
    int recvctrl_fd;        // 接收管道消息的文件描述
    int sendctrl_fd;        // 发送管道消息的文件描述
    int checkctrl;          // 判断是否有其他线程通过管道,向socket线程发送消息的标记变量
    poll_fd event_fd;       // epoll实例id
    int alloc_id;           // 已经分配的socket slot列表id
    int event_n;            // 标记本次epoll事件的数量
    int event_index;        // 下一个未处理的epoll事件索引
    struct socket_object_interface soi;
    struct event ev[MAX_EVENT]; // epoll事件列表
    struct socket slot[MAX_SOCKET];  // socket 列表
};

服务所在的工作线程,通过管道的 sendctrl_fd 写入数据。 socket 线程,通过 recvctrl_fd进行读取网络操作请求。

操作系统的fd, 都会在 slot 里分配一个对应的 socket 结构。

struct socket {
    uintptr_t opaque;       // 与本socket关联的服务地址,socket接收到的消息,最后将会传送到这个服务商
    struct wb_list high;    // 高优先级发送队列
    struct wb_list low;     // 低优先级发送队列
    int64_t wb_size;        // 发送字节大小
    int fd;                 // socket文件描述符
    int id;                 // 位于socket_server的slot列表中的位置
    uint16_t protocol;      // 使用的协议tcp or udp 
    uint16_t type;          // epoll事件触发时,会根据type来选择处理事件的逻辑
};

其中 type 会表示当前 socket 的状态,会根据这个状态触发不同的逻辑。

服务端代码

监停指定端口,并发处理客户端的请求。代码如下

-- 来着 skynet/service/debug_console.lua
local socket = require "skynet.socket"

local listen_socket = socket.listen (ip, port)
socket.start(listen_socket , function(id, addr)
    local function print(...)
      local t = { ... }
      for k,v in ipairs(t) do
        t[k] = tostring(v)
      end
      socket.write(id, table.concat(t,"\t"))
      socket.write(id, "\n")
    end
    socket.start(id)
    skynet.fork(console_main_loop, id , print)
  end)

注意点:

  • socket.listen ,会listen 对应的端口,但因为并没有把listen id 放入事件循环
  • 任何一个服务只有在调用 socket.start(id) 之后,才可以收到这个 socket 上的数据。skynet 框架是根据调用 start 这个 api 的位置来决定把对应 socket 上的数据转发到哪里去的。
  • 每个新连接的客户端,建议 skynet.fork 出一个协程单独处理。这种写法,跟多线程并发处理是一样的。

listen 过程

Lua 执行

local listen_socket = socket.listen (ip, port)

调用过程如下:

  • 调用 socket.listen @socket.lua
    • 调用 static int llisten(lua_State * L) @lua-socket.c
      • 调用 socket_server_listen @ socket_server.c
        • 调用 do_listen,获取监听的fd
        • 从插槽 slot 分配唯一的 id
        • send_request 发送类别是 ”L“ 的请求,参数是: 当前服务ID,id, 监听fd
  • Lua 层,返回唯一的id

接着执行

socket.start(listen_socket, function ()... end)
  • 调用 socket.start(id, func) @socket.lua
    • driver.start(id) 实际调用 [email protected]
      • skynet_socket_start@skynet_socket 调用 socket_server_start@socket_server.c
        • send_request 发送类别是 “R" 的请求,给 socket 线程
      • 调用 connect(id, func), 每个ID,在Lua. 对应一个表,会放入 socket_pool 里
      • 调用 suspend(s), skynet.wait(s.co) 挂起当前协程,等待被唤醒。
      • 合适的时间,被唤醒

socket 线程,是并发执行C 代码的。 不停地在执行 skynet_socket_poll

  • 调用 socket_server_poll,处理管道的操作命令,和网络的请求
    • has_cmd 检测是否有操作指令,ctrl_cmd 处理指令
      • 类别 “L“ 请求,执行 listen_socket
      • new_fd 绑定fd 和 slot 里的struct socket, 并进行初始化
        • 调用 sp_add, 把监听fd,加入到事件循环里
      • s->type 设置为 SOCKET_TYPE_PLISTEN
      • 函数并没有返回数据给任何Lua 服务
    • 处理第2个请求,是 “R” 类别,执行 resume_socket
      • s->opaque 绑定为调用 socket.start 的服务ID,之后网络数据都会转发到这个服务
      • s->type 状态从 SOCKET_TYPE_PLISTEN 改成 SOCKET_TYPE_LISTEN
      • struct socket_message * result ,返回结果里,result->data 设置为 “start”
      • 函数返回值是 SOCKET_OPEN
  • 执行以下代码, 转发结果为,给Lua 服务
// skynet_socket_poll
case SOCKET_OPEN:
		forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
		break;

socket 消息

forward_message,封装了一个类别是 PTYPE_SOCKET 的请求,数据是 struct skynet_socket_message的消息,

放入到服务opaque 的消息队列里。

debug_console.lua. 对这个消息的处理,是定义在

// lualib/skynet/socket.lua
skynet.register_protocol {
	name = "socket",
	id = skynet.PTYPE_SOCKET,	-- PTYPE_SOCKET = 6
	unpack = driver.unpack,
	dispatch = function (_, _, t, ...)
		socket_message[t](...)
	end
}

数据unpack 执行的是 lunpack @ lua-socket.c. 接下去执行 dispatch, 传入的参数是

socket_message[SKYNET_SOCKET_TYPE_CONNECT = 2](result->id, result->ud, "start")

执行的回调函数就是

-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)
	local s = socket_pool[id]
	if s == nil then
		return
	end
	-- log remote addr
	if not s.connected then	-- resume may also post connect message
		s.connected = true  # 表示成功连接
		wakeup(s)
	end
end

local function wakeup(s)
	local co = s.co
	if co then
		s.co = nil
    -- 这里并没有 yield。 只是把 co 放入 wakeup_queue. 
    -- 等到下一个调度,重新 resume co
		skynet.wakeup(co)
	end
end

这样,找到之前挂起的那个协程,重新唤醒它。 Lua 之后执行的代码,会跳转到 这里

accept 过程

socket 线程,轮循 socket_server_poll。 检测到读时间

  • sp_wait 返回的事件,根据 s->type 是SOCKET_TYPE_LISTEN, 判断出是有新连接
  • report_accept , accpet 出新的client fd, 分配新的 struct socket
    • 这里 opaque 指向的,还是对 listen id 调用 start
  • socket_server_poll 返回值是 SOCKET_ACCEPT
  • 转发结果为Lua 服务
// 	skynet_socket_poll	
case SOCKET_ACCEPT:
		forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
		break;

服务 debug_console,触发逻辑是

  • socket.lua , 执行代码 socket_message[4] , 回调之前的 s.callback
  • 执行的是 debug_console.lua 里的闭包函数
socket.start(listen_socket , function(id, addr)
		// 	...
		socket.start(id)
		skynet.fork(console_main_loop, id , print)
	end)

调用 socket.start(id, func) @socket.lua , 协程挂起。 参考上面的流程,这个挂起,之后会被唤醒。

skynet.fork 出一个新的协程,并发地处理每个客户端的读写操作。

read 数据

针对其中一个客户端,执行 console_main_loop ,执行

local cmdline = socket.readline(stdin, "\n")v

如果 s.buffer 读缓冲区里有 “\n", 就直接读取并返回。 否则就会挂起

	local ret = driver.readline(s.buffer, buffer_pool, sep)
	if ret then
		return ret
	end

	assert(not s.read_required)
	s.read_required = sep
	-- 在这里挂起
	suspend(s)

socket 线程里,客户端每次发过来新的数据,最终会触发服务执行代码. socket_message[1] @socket.lua

local s = socket_pool[id]
local sz = driver.push(s.buffer, buffer_pool, data, size)
local rr = s.read_required   -- 现在这里应该是 "\n"
local rrt = type(rr)

-- 下面逻辑走 else
if rrt == "string" then
			-- read line
			if driver.readline(s.buffer,nil,rr) then
				s.read_required = nil
				wakeup(s)

wakeup 后, 最终框架会继续驱动上面挂起的 socket.readline 返回,继续执行之前的逻辑。

skynet 学习笔记 - 服务

这篇文章,讲Skynet 调度机制中另外一个重点是:服务。

  • 服务的结构
  • 创建服务
  • 消息回调
  • 日志服务
  • 参考资料

服务

把一个符合规范的 C 模块,从动态库(so 文件)中加载进来。绑定一个永不重复的数字 id 做为其 handle 。

这个运行的模块就是服务。 服务本质上是C语言的一个结构体。

模块和服务的关系,可以理解为程序和进程。

struct skynet_context {
	void * instance;              // 指针,指向模块create的结构体
	struct skynet_module * mod;   // 对动态库 so 的封装
	void * cb_ud;                 // 消息回调的上下文
	skynet_cb cb;                 // 消息回调
	struct message_queue *queue;  // 服务所属的消息队列
	FILE * logfile;               // 如果存在,会通过 skynet_log_output 记录消息到这个文件中
	char result[32];              // 保存相关函数的临时结果
	uint32_t handle;              // 服务的 handle, 具有唯一性
	int session_id;               // 请求回应模式
	int ref;                      // 引用计数
	int message_count;            // 记录服务总共处理了多少个消息
	...
};

创建服务

通过下面函数,可以创建一个服务。

struct skynet_context * 
skynet_context_new(const char * name, const char *param);

skynet_context_new 主要的操作是

  • 获取/加载C服务的动态库 (so 文件)
  • 为服务分配 skynet_context ,并生成一个唯一的handle
  • 调用动态库的create函数, 绑定到字段 instance 上
  • 为服务分配消息队列
  • 调用 xxx_init 函数,进行初始化工作
  • 将消息队列压入全局消息队列中

每一个实现C 服务的模块, 约定实现以下4个接口

    struct skynet_module {
        const char * name;			// so 文件名称
        void * module;				// dlopen 返回的指针
        skynet_dl_create create;    // 创建XX服务。 必须
        skynet_dl_init init;        // 初始化XX服务实例。 可选
        skynet_dl_release release;  // 释放XX服务实例。 可选
        skynet_dl_signal signal;    // XX服务信号处理器。 可选
    };

消息回调

每个服务可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。

每个服务都是被一个个消息包驱动,当没有包到来的时候,它们就会处于挂起状态,对CPU资源零消耗。

可以通过这个函数设置回调

void 
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
	context->cb = cb;
	context->cb_ud = ud;
}

callback 函数的定义

typedef int (*skynet_cb)(
  struct skynet_context * context,
  void *ud, 
  int type, 
  int session, 
  uint32_t source,
  const void * msg,
  size_t sz
);
  • ud, 是服务的上下文。 skynet_callback 设置了这个值。
  • type, 表示的是当前消息包的协议组别。 具体的可选值见 skynet.h 里的宏 PTYPE_XXX。
  • session, 请求回应模式中需要。通过这个值,可以找到Lua 虚拟机里挂起的协程。
  • source, 消息来源,表示回应地址。
  • msg/sz ,就是数据包

对指定服务发送消息

int skynet_send(
  struct skynet_context * context, 
  uint32_t source, 
  uint32_t destination,
  int type,
  int session,
  void * msg, 
  size_t sz
);

日志服务

日志服务 logger, 是启动的第一个服务。 它负责记其他服务的日志输出。

例如,下面的Lua代码,通过日志服务打印了一行日志

	skynet.error("Watchdog listen on", 8888)

skynet.error 实现是这样的

  • skynet.error 是 lua-skynet.c#lerror 函数的别名
  • lerror 函数做了下面的事情
    • 调用底层函数 skynet_error
    • skynet_error 功能
      • 格式化参数
      • 把最终的字符串,通过 skynet_context_push,放入log服务的消息队列中

那么, logger 服务是如何实现的 ?

struct logger {
	FILE * handle;
	char * filename;
	int close;
};

struct logger *
logger_create(void) {
	struct logger * inst = skynet_malloc(sizeof(*inst));
	inst->handle = NULL;
	inst->close = 0;
	inst->filename = NULL;

	return inst;
}

logger 服务创建的时候, 会附带一个数据结构 struct logger . 从这个结构知道,日志可以输出到指定的一个文件。

int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {
	if (parm) {
		inst->handle = fopen(parm,"w");
		if (inst->handle == NULL) {
			return 1;
		}
		inst->filename = skynet_malloc(strlen(parm)+1);
		strcpy(inst->filename, parm);
		inst->close = 1;
	} else {
		inst->handle = stdout;
	}
	if (inst->handle) {
		skynet_callback(ctx, inst, logger_cb);
		skynet_command(ctx, "REG", ".logger");
		return 0;
	}
	return 1;
}

初始化时,init 根据是否传入参数,决定了日志是输出到文件或者标准输出。

再看看回调函数 logger_cb

static int
logger_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
	struct logger * inst = ud;
	switch (type) {
	case PTYPE_SYSTEM:
		if (inst->filename) {
			inst->handle = freopen(inst->filename, "a", inst->handle);
		}
		break;
	case PTYPE_TEXT:
		fprintf(inst->handle, "[:%08x] ",source);
		fwrite(msg, sz , 1, inst->handle);
		fprintf(inst->handle, "\n");
		fflush(inst->handle);
		break;
	}

	return 0;
}
  • PTYPE_TEXT 类型的消息是普通的日志。
  • PTYPE_SYSTEM 类型的消息,会触发重新打开日志文件。 这个功能在日志备份的时候非常有用。

下面,我们会讲一讲,使用最多的 Lua 沙盒服务。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.