前言

在上一篇文章中,介绍了客户端同步调用的具体实现的,主要还是通过调用传输模块的相关函数,完成发送和接收远程调用的数据。

在调用多个远程方法时,同步调用是以串行的方式执行的,导致运行效率比较低,所以需要使用并行调用来提高调用多个远程方法的运行效率,减少整体运行的时间。

并行传输模块

yar_transport_multi_t

在数据传输模块中,我们提到了 yar_transport_multi_t *multi 就是用来实现并行调用的,先来看看该结构体的定义。

1
2
3
4
5
6
// source:yar_transport.h

// 并行传输器的工厂
typedef struct _yar_transport_multi {
struct _yar_transport_multi_interface * (*init)();
} yar_transport_multi_t;

可以将 yar_transport_multi_t 看作是并行传输器的工厂,使用 init 函数创建并行传输器的实例,也就是为 _yar_transport_multi_interface 结构体分配内存,并初始化相关配置。

yar_transport_multi_interface_t

1
2
3
4
5
6
7
8
9
// source:yar_transport.h

// 并行传输器
typedef struct _yar_transport_multi_interface {
void *data;
int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp);
int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback);
void (*close)(struct _yar_transport_multi_interface *self);
} yar_transport_multi_interface_t;

其实 yar_transport_multi_interface_t 结构体和 yar_transport_interface_t 结构体类似,都是用来定义在传输时使用的函数,下面是字段说明:

  • data:存储并行传输器的数据指针,指向 yar_curl_multi_data_t 结构体。
  • add:函数指针,将一个同步传输器(yar_transport_interface_t)实例中的数据存储到并行传输器中。
  • exec:函数指针,执行请求,将所有的同步传输器实例中的数据发送出去。
  • close:函数指针,释放相关资源。

从上面的说明可以看出来,并行传输器实际上是在管理多个同步传输器实例,先调用 add 函数将同步传输器实例存储到并行传输器中,然后通过 exec 函数将这些同步传输器实例中的数据发送出去,并对响应结果进行处理,最后调用 close 函数清理相关资源。

yar_curl_multi_data_t

虽然 yar_curl_multi_data_t 结构体的字段比较少,但是通过这个结构体,可以知道并行传输器是如何存储同步传输器实例的数据的。

1
2
3
4
5
6
// source:transports/curl.c

typedef struct _yar_curl_multi_data_t {
CURLM *cm;
yar_transport_interface_t *chs;
} yar_curl_multi_data_t;

cm 字段用来存储 curl 批处理实例的指针,chs 则是同步传输器的指针,同步传输器中 data 字段指向的结构体中有一个 next 字段,用来指向下一个同步传输器:

1
2
3
4
5
6
// source:transports/curl.c

typedef struct _yar_curl_data_t {
// ...
yar_transport_interface_t *next;
} yar_curl_data_t;

看到这里你应该就明白了,Yar 通过单链表的结构将所有的同步传输器实例串起来, yar_curl_multi_data_t 中的 chs 字段相当于头指针。

并行客户端

先来看看并行客户端的方法列表及属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// source:yar_client.c

// 方法列表
zend_function_entry yar_concurrent_client_methods[] = {
PHP_ME(yar_concurrent_client, call, arginfo_client_async, ZEND_ACC_PUBLIC|ZEND_ACC_STATIC)
PHP_ME(yar_concurrent_client, loop, arginfo_client_loop, ZEND_ACC_PUBLIC|ZEND_ACC_STATIC)
PHP_ME(yar_concurrent_client, reset,arginfo_client_void, ZEND_ACC_PUBLIC|ZEND_ACC_STATIC)
PHP_FE_END
};

INIT_CLASS_ENTRY(ce, "Yar_Concurrent_Client", yar_concurrent_client_methods);
yar_concurrent_client_ce = zend_register_internal_class(&ce);
// 声明并行客户端的属性
zend_declare_property_null(yar_concurrent_client_ce, ZEND_STRL("_callstack"), ZEND_ACC_PROTECTED|ZEND_ACC_STATIC);
zend_declare_property_null(yar_concurrent_client_ce, ZEND_STRL("_callback"), ZEND_ACC_PROTECTED|ZEND_ACC_STATIC);
zend_declare_property_null(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), ZEND_ACC_PROTECTED|ZEND_ACC_STATIC);
zend_declare_property_bool(yar_concurrent_client_ce, ZEND_STRL("_start"), 0, ZEND_ACC_PROTECTED|ZEND_ACC_STATIC);

由此我们可以知道并行客户端大概长这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class {
protected static $_callstack = null; // 存储每个调用的数据
protected static $_callback = null; // 并行客户端的回调函数
protected static $_error_callback = null; // 并行客户端的异常回调函数
protected static $_start = false; // 是否已启动

// 调用某个方法
public static function call($uri, $method, $parameters = null, $callback = null, $error_callback = null, $options = array())
{

}

// 执行调用并等待响应
public static function loop($callback = null, $error_callback = null)
{

}

// 重置并行客户端
public static function reset()
{

}
}

Yar 定义了 yar_call_data_t 结构体将 call 方法的参数存起来,当作远程调用的数据。

1
2
3
4
5
6
7
8
9
10
11
// source:yar_transport.h

typedef struct _yar_call_data {
zend_long sequence;
zend_string *uri;
zend_string *method;
zval callback;
zval ecallback;
zval parameters;
zval options;
} yar_call_data_t;

在 PHP 中使用该结构体前,需要在 Yar 启动时向 PHP 注册该资源类型,同时传入析构函数,用于 PHP 释放该资源的时候调用。

1
2
3
4
5
6
7
YAR_STARTUP_FUNCTION(transport) {
// ...

le_calldata = zend_register_list_destructors_ex(php_yar_calldata_dtor, NULL, "Yar Call Data", module_number);

return SUCCESS;
}

zend_register_list_destructors_ex 函数会返回一个整数,表示注册的资源类型,在取回资源时需要用到该整数。

并行调用

这次我们使用 gdb 调试的方式,一步步的展示并行调用的过程,在此之前需要编译好 Yar,编译时记得关闭编译优化,并在 php.ini 中配置好扩展,然后启动开篇中的 Yar 服务端示例

Yar_Concurrent_Client::call

客户端的并行调用示例中,先调用了 call 方法,然后调用了 loop 方法,所以我们分别在这两个方法的上打断点。通过上面并行客户端的介绍可以知道,实现 call 方法的函数是 zim_yar_concurrent_client_call,我们在该函数上打断点即可。

1
2
3
4
5
6
$ gdb php
(gdb) b zim_yar_concurrent_client_call
Function "zim_yar_concurrent_client_call" not defined.
Make breakpoint pending on future shared library load? (y or [n]) y
Breakpoint 1 (zim_yar_concurrent_client_call) pending.
(gdb)

开始运行 PHP 并指定运行的 PHP 脚本。

1
2
3
4
5
6
7
8
9
10
11
(gdb) r concurrent_client.php
Starting program: /usr/bin/php concurrent_client.php
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".

Breakpoint 1, zim_yar_concurrent_client_call (
execute_data=0x5555557de9ee <zend_restore_lexical_state+222>,
return_value=0x555555a2b240 <language_scanner_globals>)
at /home/vagrant/code/her-cat/yar/yar_client.c:645
645 PHP_METHOD(yar_concurrent_client, call) {
(gdb)

zim_yar_concurrent_client_call 函数作用是将 call 方法的参数组装为 yar_call_data_t 结构体,并存储到并行客户端的 _callstack 数组中,然后返回本次调用的序号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// source:yar_client.c

PHP_METHOD(yar_concurrent_client, call) {
zend_string *uri, *method;
zend_string *name = NULL;
zval *callstack, item, *status;
zval *error_callback = NULL, *callback = NULL, *parameters = NULL, *options = NULL;
yar_call_data_t *entry;

// 解析 call 方法中的参数
if (zend_parse_parameters(ZEND_NUM_ARGS(), "SS|a!z!za",
&uri, &method, &parameters, &callback, &error_callback, &options) == FAILURE) {
return;
}

// 省略一些对参数校验的代码...


// 校验并行客户端的状态
status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
if (UNEXPECTED(Z_TYPE_P(status) == IS_TRUE)) {
php_error_docref(NULL, E_WARNING, "concurrent client has already started");
RETURN_FALSE;
}

// 分配调用数据的内存
entry = ecalloc(1, sizeof(yar_call_data_t));

// 将参数赋值到调用数据上
entry->uri = zend_string_copy(uri);
entry->method = zend_string_copy(method);

if (callback && !Z_ISNULL_P(callback)) {
ZVAL_COPY(&entry->callback, callback);
}
if (error_callback && !Z_ISNULL_P(error_callback)) {
ZVAL_COPY(&entry->ecallback, error_callback);
}
if (parameters && IS_ARRAY == Z_TYPE_P(parameters)) {
ZVAL_COPY(&entry->parameters, parameters);
}
if (options && IS_ARRAY == Z_TYPE_P(options)) {
ZVAL_COPY(&entry->options, options);
}

// 初始化调用栈数组
callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
if (Z_ISNULL_P(callstack)) {
zval rv;
array_init(&rv);
zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), &rv);
ZVAL_ARR(callstack, Z_ARRVAL(rv));
Z_DELREF(rv);
}

// 将调用数据注册到 zend 中,并返回 zend_resouce 类型的数据
ZVAL_RES(&item, zend_register_resource(entry, le_calldata));

// 设置调用数据的序号
entry->sequence = zend_hash_num_elements(Z_ARRVAL_P(callstack)) + 1;

// 将 zend_resource 类型的调用数据存储到 callstack 中
zend_hash_next_index_insert(Z_ARRVAL_P(callstack), &item);

RETURN_LONG(entry->sequence);
}

让代码执行到最后一行,并打印 entry 中的数据。

1
2
3
4
5
6
7
8
9
10
11
(gdb) u 733
zim_yar_concurrent_client_call (execute_data=0x7ffff54130d0,
return_value=0x7fffffffaa80)
at /home/vagrant/code/her-cat/yar/yar_client.c:733
733 RETURN_LONG(entry->sequence);
(gdb) p *entry->uri.val@32
$5 = "http://127.0.0.1:3000/server.php"
(gdb) p *entry->method.val@5
$6 = "login"
(gdb) p entry->sequence
$7 = 1

Yar_Concurrent_Client::loop

先在 loop 方法上打断点,然后让代码执行到断点处。

1
2
3
4
5
6
7
8
9
(gdb) b zim_yar_concurrent_client_loop
Breakpoint 2 at 0x7ffff56e97f7: file /home/vagrant/code/her-cat/yar/yar_client.c, line 751.
(gdb) c
Continuing.

Breakpoint 1, zim_yar_concurrent_client_call (execute_data=0x7ffff54130d0, return_value=0x7fffffffaa80)
at /home/vagrant/code/her-cat/yar/yar_client.c:645
645 PHP_METHOD(yar_concurrent_client, call) {
(gdb)

在 zim_yar_concurrent_client_loop 函数中,先校验 loop 方法的参数及并行客户端的运行状态,然后从并行客户端中读取 _callstack 数组,调用 php_yar_concurrent_client_handle 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// source:yar_client.c

PHP_METHOD(yar_concurrent_client, loop) {
zend_string *name = NULL;
zval *callstack;
zval *callback = NULL, *error_callback = NULL;
zval *status;
unsigned ret = 0;

// 解析参数
if (zend_parse_parameters(ZEND_NUM_ARGS(), "|zz", &callback, &error_callback) == FAILURE) {
return;
}

// 判断运行状态
status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
if (UNEXPECTED(Z_TYPE_P(status) == IS_TRUE)) {
php_error_docref(NULL, E_WARNING, "concurrent client has already started");
RETURN_FALSE;
}

// 省略一些对参数校验的代码...

// 读取所有的调用数据
callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
if (Z_ISNULL_P(callstack) || zend_hash_num_elements(Z_ARRVAL_P(callstack)) == 0) {
RETURN_TRUE;
}

// 更新为运行中的状态
ZVAL_BOOL(status, 1);
ret = php_yar_concurrent_client_handle(callstack);
ZVAL_BOOL(status, 0);
RETURN_BOOL(ret);
}

php_yar_concurrent_client_handle

让代码执行到调用 php_yar_concurrent_client_handle 的前一行,然后进入函数内部。

1
2
3
4
5
6
7
8
9
10
(gdb) u 801
zim_yar_concurrent_client_loop (execute_data=0x7ffff54130d0, return_value=0x7fffffffaa80)
at /home/vagrant/code/her-cat/yar/yar_client.c:801
801 ZVAL_BOOL(status, 1);
(gdb) s
802 ret = php_yar_concurrent_client_handle(callstack);
(gdb) s
php_yar_concurrent_client_handle (callstack=0x0) at /home/vagrant/code/her-cat/yar/yar_client.c:456
456 int php_yar_concurrent_client_handle(zval *callstack) /* {{{ */ {
(gdb)

php_yar_concurrent_client_handle 函数的主要逻辑:

  • 遍历 callstack 数组
    • 为每一个调用数据创建同步传输器。
    • 分别调用同步传输器中的 open、send、calldata 等函数初始化调用请求。
    • 调用 multi->add 函数将这些同步传输器存储到并行调用器中。
  • 调用 multi->exec 函数将这些请求发送出去。
  • 收到响应结果后调用 php_yar_concurrent_client_callback 函数进行处理。
  • 最后调用 multi->close 函数释放相关资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// source:yar_client.c

int php_yar_concurrent_client_handle(zval *callstack) /* {{{ */ {
char *msg;
zval *calldata;
yar_request_t *request;
const yar_transport_t *factory;
yar_transport_interface_t *transport;
yar_transport_multi_interface_t *multi;

// 获取 curl 传输器工厂实例
factory = php_yar_transport_get(ZEND_STRL("curl"));
// 创建并行传输器实例
multi = factory->multi->init();

// 遍历 callstack 数组
ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(callstack), calldata) {
yar_call_data_t *entry;
long flags = 0;

entry = (yar_call_data_t *)zend_fetch_resource(Z_RES_P(calldata), "Yar Call Data", le_calldata);

if (!entry) {
continue;
}

if (Z_ISUNDEF(entry->parameters)) {
array_init(&entry->parameters);
}

// 为每一个调用数据创建同步传输器
transport = factory->init();

if (!Z_ISUNDEF(entry->options)) {
zval *flag = php_yar_client_get_opt(&entry->options, YAR_OPT_PERSISTENT);
if (flag && (Z_TYPE_P(flag) == IS_TRUE || (Z_TYPE_P(flag) == IS_LONG && Z_LVAL_P(flag)))) {
flags |= YAR_PROTOCOL_PERSISTENT;
}
}

// 创建调用请求
if (!(request = php_yar_request_instance(entry->method,
&entry->parameters, Z_ISUNDEF(entry->options)? NULL: & entry->options))) {
transport->close(transport);
factory->destroy(transport);
return 0;
}

// 创建一个连接
msg = (char*)&entry->options;
if (!transport->open(transport, entry->uri, flags, &msg)) {
php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
transport->close(transport);
factory->destroy(transport);
efree(msg);
return 0;
}

// 对请求数据进行编码、组装等操作,并将处理后的数据保存到连接中
if (!transport->send(transport, request, &msg)) {
php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
transport->close(transport);
factory->destroy(transport);
efree(msg);
return 0;
}

// 将调用数据保存到同步传输器中
transport->calldata(transport, entry);
// 将同步传输器存储到并行调用器中。
multi->add(multi, transport);
// 释放请求
php_yar_request_destroy(request);
} ZEND_HASH_FOREACH_END();

// 执行所有请求
if (!multi->exec(multi, php_yar_concurrent_client_callback)) {
multi->close(multi);
return 0;
}

// 释放资源
multi->close(multi);
return 1;
}

我们看下并行传输器中函数指针对应的函数。

1
2
3
(gdb) p *multi
$10 = {data = 0x7ffff5461050, add = 0x7ffff56eea4e <php_yar_curl_multi_add_handle>,
exec = 0x7ffff56ef1e4 <php_yar_curl_multi_exec>, close = 0x7ffff56ef6af <php_yar_curl_multi_close>}

可以看到 add、exec、close 分别对应 php_yar_curl_multi_add_handle、php_yar_curl_multi_exec、php_yar_curl_multi_close 等函数,分别在这些函数上打断点,然后继续运行程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(gdb) b php_yar_curl_multi_add_handle
Breakpoint 3 at 0x7ffff56eea4e: file /home/vagrant/code/her-cat/yar/transports/curl.c, line 604.
(gdb) b php_yar_curl_multi_exec
Breakpoint 4 at 0x7ffff56ef1e4: file /home/vagrant/code/her-cat/yar/transports/curl.c, line 748.
(gdb) b php_yar_curl_multi_close
Breakpoint 5 at 0x7ffff56ef6af: file /home/vagrant/code/her-cat/yar/transports/curl.c, line 951.
(gdb) c
Continuing.

Breakpoint 3, php_yar_curl_multi_add_handle (self=0x7ffff5402428, handle=0x7ffff548e060)
at /home/vagrant/code/her-cat/yar/transports/curl.c:604
warning: Source file is more recent than executable.
604 int php_yar_curl_multi_add_handle(yar_transport_multi_interface_t *self, yar_transport_interface_t *handle) /* {{{ */ {
(gdb)

php_yar_curl_multi_add_handle

php_yar_curl_multi_add_handle 函数的作用:将同步传输器实例以头插法保存到 chs 单链表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// source:transports/curl.c

int php_yar_curl_multi_add_handle(yar_transport_multi_interface_t *self, yar_transport_interface_t *handle) {
yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
yar_curl_data_t *data = (yar_curl_data_t *)handle->data;

// 预处理,将传输数据保存到 curl 中
php_yar_curl_prepare(handle);

// 将同步传输器的 curl 实例添加到 curl 批处理实例中
curl_multi_add_handle(multi->cm, data->cp);

// 头插法保存
if (multi->chs) {
data->next = multi->chs;
multi->chs = handle;
} else {
multi->chs = handle;
}

return 1;
}

php_yar_curl_multi_exec

php_yar_curl_multi_exec 函数的作用:通过 epoll 或 select 多路复用机制监控文件描述符的状态,并配合 curl_multi_* 系列函数对请求进行处理。

以下是主要逻辑:

  • 从 curl 批处理实例中取出所有请求的文件描述符。
  • 调用 select 函数监听文件描述符的状态,阻塞当前进程。
  • 进程从休眠状态恢复后,调用 curl_multi_perform 处理响应数据。
  • 调用 php_yar_curl_multi_parse_response 函数解析响应数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// source:transports/curl.c

int php_yar_curl_multi_exec(yar_transport_multi_interface_t *self, yar_concurrent_client_callback *f) /* {{{ */ {
int running_count, rest_count;
yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;

// 尝试让 libcurl 处理数据,并获取当前正在运行中的数量
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));

// 第一次调用回调函数,让调用方执行自己的逻辑。
if (!f(NULL, YAR_ERR_OKEY, NULL)) {
goto bailout;
}

if (running_count) {
rest_count = running_count;
do {
int max_fd, return_code;
struct timeval tv;
fd_set readfds;
fd_set writefds;
fd_set exceptfds;

FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);

// 从 curl 批处理实例中取出所有请求的文件描述符
curl_multi_fdset(multi->cm, &readfds, &writefds, &exceptfds, &max_fd);
if (max_fd == -1) {
// max_fd 为 -1 说明 libcurl 正在处理一些不能使用套接字监控的事情,
// 所以需要等待一会再调用 curl_multi_perform
long timeout;
curl_multi_timeout(multi->cm, &timeout);
if (timeout < 0) {
timeout = 50;
}
if (timeout) {
tv.tv_sec = timeout / 1000;
tv.tv_usec = (timeout % 1000) * 1000;
select(1, &readfds, &writefds, &exceptfds, &tv);
}
// 让 libcurl 处理数据
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
goto process;
}

// 计算 select 超时时间
tv.tv_sec = (zend_ulong)(YAR_G(timeout) / 1000);
tv.tv_usec = (zend_ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) % 1000) * 1000 : 0);

// 等待请求返回数据
return_code = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);
if (return_code > 0) {
// 说明某些文件描述符可读/可写,让 libcurl 进行处理
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
} else if (-1 == return_code) {
// 发生异常
php_error_docref(NULL, E_WARNING, "select error '%s'", strerror(errno));
goto onerror;
} else {
// 等待超时
php_error_docref(NULL, E_WARNING, "select timeout %ldms reached", YAR_G(timeout));
goto onerror;
}
process:
if (rest_count > running_count) {
// 解析响应的数据
int ret = php_yar_curl_multi_parse_response(multi, f);
if (ret == -1) {
goto bailout;
} else if (ret == 0) {
goto onerror;
}
rest_count = running_count;
}
} while (running_count);
} else {
// 第一次尝试让 libcurl 处理数据后,
// 如果运行中的数量为 0 ,说明已经全部处理完了,
// 直接进行解析
int ret = php_yar_curl_multi_parse_response(multi, f);
if (ret == -1) {
goto bailout;
} else if (ret == 0) {
goto onerror;
}
}

return 1;
onerror:
return 0;
bailout:
self->close(self);
zend_bailout();
return 0;
}

php_yar_curl_multi_parse_response

在 php_yar_curl_multi_parse_response 函数上打断点,然后继续运行程序。

1
2
3
4
5
6
7
8
9
(gdb) b php_yar_curl_multi_parse_response
Breakpoint 6 at 0x7ffff56eeadb: file /home/vagrant/code/her-cat/yar/transports/curl.c, line 622.
(gdb) c
Continuing.

Breakpoint 6, php_yar_curl_multi_parse_response (multi=0x20b666f03b932500, f=0x16fb)
at /home/vagrant/code/her-cat/yar/transports/curl.c:622
622 static int php_yar_curl_multi_parse_response(yar_curl_multi_data_t *multi, yar_concurrent_client_callback *f) /* {{{ */ {
(gdb)

php_yar_curl_multi_parse_response 函数的主要逻辑:

  • 从 curl 批处理实例中取出所有收到的响应数据。
  • 遍历 multi->chs 找到数据对应的同步传输器。
  • 检查响应数据是否正确。
  • 解析响应数据,得到 Yar 协议中的头部及 payload 部分的内容。
  • 调用 php_yar_concurrent_client_callback 函数,执行对应的回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// source:transports/curl.c

static int php_yar_curl_multi_parse_response(yar_curl_multi_data_t *multi, yar_concurrent_client_callback *f) {
int msg_in_sequence;
CURLMsg *msg;

do {
// 从 curl 批处理实例中读取一条消息,并返回剩余消息数量(msg_in_sequence)
msg = curl_multi_info_read(multi->cm, &msg_in_sequence);
if (msg && msg->msg == CURLMSG_DONE) {
// 标记是否找到了对应的同步传输器
unsigned found = 0;
yar_transport_interface_t *handle = multi->chs, *q = NULL;

// 遍历单链表
while (handle) {
// curl 实例的指针地址相等,说明找到了
if (msg->easy_handle == ((yar_curl_data_t*)handle->data)->cp) {
// 从单链表中移除
if (q) {
((yar_curl_data_t *)q->data)->next = ((yar_curl_data_t*)handle->data)->next;
} else {
multi->chs = ((yar_curl_data_t*)handle->data)->next;
}
found = 1;
break;
}
q = handle;
handle = ((yar_curl_data_t*)handle->data)->next;
}

if (found) {
long http_code = 200;
yar_response_t *response;
yar_curl_data_t *data = (yar_curl_data_t *)handle->data;

// 创建响应实例
response = php_yar_response_instance();

if (msg->data.result == CURLE_OK) {
curl_multi_remove_handle(multi->cm, data->cp);

// 获取 HTTP 状态码
if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK && http_code != 200) {
// 非 200 说明出现异常
// 返回异常响应
continue;
} else {
// 状态码为 200
if (data->buf.s) {
// ...

// 从响应数据中解析出 Yar 协议的头部信息
if (!(header = php_yar_protocol_parse(payload))) {
php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
} else {
// 跳过 Yar 协议的头部信息后的内容,就是真正的响应结果
payload += sizeof(yar_header_t);
payload_len -= sizeof(yar_header_t);

// 通过 payload 中的编码方式对 payload 进行解码
if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &ret))) {
php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
} else {
// 将解码后的数据存储到 response 中
php_yar_response_map_retval(response, retval);
DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'", response->id, 7, payload, header->body_len, payload + 8);
zval_ptr_dtor(retval);
}
if (msg) {
efree(msg);
}
}
} else {
// 响应内容为空,设置错误信息
php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
}

// 调用 php_yar_concurrent_client_callback 函数
if (!f(data->calldata, response->status, response)) {
handle->close(handle);
php_yar_response_destroy(response);
return -1;
}
if (EG(exception)) {
handle->close(handle);
php_yar_response_destroy(response);
return 0;
}
}
} else {
// 执行请求失败,返回失败原因
char *err = (char *)curl_easy_strerror(msg->data.result);
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, err, strlen(err));
if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
handle->close(handle);
php_yar_response_destroy(response);
return -1;
}
if (EG(exception)) {
handle->close(handle);
php_yar_response_destroy(response);
return 0;
}
}
handle->close(handle);
php_yar_response_destroy(response);
} else {
php_error_docref(NULL, E_WARNING, "unexpected transport info missed");
}
}
} while (msg_in_sequence);

return 1;
}

php_yar_concurrent_client_callback

该函数的作用:通过响应状态判断本次远程调用的结果,执行相应的回调函数。

主要逻辑:

  • 通过响应状态获取要执行的 PHP 回调函数。
  • 组装被调用函数的参数。
  • 执行 PHP 回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// source:yar_client.c

int php_yar_concurrent_client_callback(yar_call_data_t *calldata, int status, yar_response_t *response) /* {{{ */ {
zval code, retval, retval_ptr;
zval callinfo, *callback, func_params[3];
zend_bool bailout = 0;
unsigned params_count, i;

if (calldata) {
// 通过响应状态获取要执行的 PHP 回调函数
if (status == YAR_ERR_OKEY) {
if (!Z_ISUNDEF(calldata->callback)) {
callback = &calldata->callback;
} else {
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
}
params_count = 2;
} else {
if (!Z_ISUNDEF(calldata->ecallback)) {
callback = &calldata->ecallback;
} else {
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), 0);
}
params_count = 3;
}

// 没获取到回调函数就提示错误信息
if (Z_ISNULL_P(callback)) {
if (status != YAR_ERR_OKEY) {
if (!Z_ISUNDEF(response->err)) {
php_yar_client_handle_error(0, response);
} else {
php_error_docref(NULL, E_WARNING, "[%d]:unknown Error", status);
}
} else if (!Z_ISUNDEF(response->retval)) {
zend_print_zval(&response->retval, 1);
}
return 1;
}

if (status == YAR_ERR_OKEY) {
// 响应状态是成功但是返回值为空,提示错误信息。
if (Z_ISUNDEF(response->retval)) {
php_yar_client_trigger_error(0, YAR_ERR_REQUEST, "%s", "server responsed empty response");
return 1;
}
// 复制返回值
ZVAL_COPY(&retval, &response->retval);
} else {
// 复制状态及错误信息
ZVAL_LONG(&code, status);
ZVAL_COPY(&retval, &response->err);
}

// 初始化调用信息
array_init(&callinfo);

add_assoc_long_ex(&callinfo, "sequence", sizeof("sequence") - 1, calldata->sequence);
add_assoc_str_ex(&callinfo, "uri", sizeof("uri") - 1, zend_string_copy(calldata->uri));
add_assoc_str_ex(&callinfo, "method", sizeof("method") - 1, zend_string_copy(calldata->method));
} else {
// 调用数据为空则获取并行客户端的回调函数
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
if (Z_ISNULL_P(callback)) {
return 1;
}
params_count = 2;
}

if (calldata && (status != YAR_ERR_OKEY)) {
// 调用失败
ZVAL_COPY_VALUE(&func_params[0], &code);
ZVAL_COPY_VALUE(&func_params[1], &retval);
ZVAL_COPY_VALUE(&func_params[2], &callinfo);
} else if (calldata) {
// 调用成功
ZVAL_COPY_VALUE(&func_params[0], &retval);
ZVAL_COPY_VALUE(&func_params[1], &callinfo);
} else {
// 调用数据为空
ZVAL_NULL(&func_params[0]);
ZVAL_NULL(&func_params[1]);
}

zend_try {
// 执行回调函数
if (call_user_function(EG(function_table), NULL, callback, &retval_ptr, params_count, func_params) != SUCCESS) {
for (i = 0; i < params_count; i++) {
zval_ptr_dtor(&func_params[i]);
}
if (calldata) {
php_error_docref(NULL, E_WARNING, "call to callback failed for request: '%s'", ZSTR_VAL(calldata->method));
} else {
php_error_docref(NULL, E_WARNING, "call to initial callback failed");
}
return 1;
}
} zend_catch {
bailout = 1;
} zend_end_try();

// 释放返回值及参数
if (!Z_ISUNDEF(retval_ptr)) {
zval_ptr_dtor(&retval_ptr);
}

for (i = 0; i < params_count; i++) {
zval_ptr_dtor(&func_params[i]);
}
return bailout? 0 : 1;
}

打印 retval 可以看到服务端的返回值:success。

1
2
(gdb) p *retval.value.str.val@7
$28 = "success"

总结

本文介绍了实现并行调用相关的数据结构,并通过 GDB 调试代码的方式展示了并行调用运行的过程。