前言

在前面几篇文章中,更多的是在研究 Yar 传输的内容,比如协议的格式是什么样的、如何对数据进行编码等等。

今天这篇文章,主要介绍 Yar 编码模块的结构体定义,以及 HTTP 传输方式的实现,更深入的了解 Yar 的协议数据是如何被发送出去的。

传输模块结构体

RPC 通信协议 中有提到过,Yar 支持 HTTP 和 TCP 两种数据传输方式,前者使用 curl,后者使用 socket。

yar_transport_t

我们可以将 yar_transport_t 结构体看作是同步传输器的工厂,用于创建和释放同步传输器的实例。

1
2
3
4
5
6
7
8
9
// source:yar_transport.h

// 同步传输器的工厂
typedef struct _yar_transport {
const char *name;
struct _yar_transport_interface * (*init)();
void (*destroy)(yar_transport_interface_t *self);
yar_transport_multi_t *multi;
} yar_transport_t;

结构体字段说明:

  • name:传输方式的名称,比如 “curl”。
  • init:函数指针,创建同步传输器实例并初始化相关配置,函数的返回值是结构体 _yar_transport_interface 的指针。
  • destroy:函数指针,释放某个同步传输器的资源,函数的参数是结构体 _yar_transport_interface 的指针。
  • multi:用于实现并行调用的结构体指针,这部分会在并行调用进行详解。

yar_transport_interface_t

在 yar_transport_t 结构体中,只定义了 init 和 destroy 这两个函数指针,用于对同步传输器的创建和释放。而真正用于传输时使用的相关函数,则放在了 yar_transport_interface_t 结构体中。

1
2
3
4
5
6
7
8
9
10
11
12
// source:yar_transport.h

// 同步传输器
typedef struct _yar_transport_interface {
void *data;
int (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg);
int (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg);
struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request);
int (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition);
int (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata);
void (*close)(struct _yar_transport_interface *self);
} yar_transport_interface_t;

结构体说明:

  • data:存储传输相关的数据指针,比如 curl 是 yar_curl_data_t 结构体,socket 是 yar_socket_data_t 结构体。
  • open:函数指针,打开/创建一个连接,并初始化相关默认值。
  • send:函数指针,对请求数据进行编码、组装等操作,并将处理后的数据保存到连接中。
  • exec:函数指针,执行请求,将连接中的数据发送出去。
  • setopt:函数指针,设置连接相关的选项,比如编码方式、超时时间。
  • calldata:函数指针,设置回调时的数据,在并行调用时才会用到。
  • close:函数指针:关闭连接并释放相关的资源。

HTTP 传输方式

Yar 支持的传输方式都放在了 transports 目录下,因为 HTTP 是 Yar 默认的数据传输方式,并且服务端也只实现了 HTTP 这种方式,所以我们这里用 HTTP 进行举例。

用 c 语言实现 HTTP 请求的类库有很多,Yar 使用的是 curl,所以在 transports 目录下的文件命名是 curl.c。

yar_transport_t

根据前面传输模块的介绍,首先需要实现 yar_transport_t 结构体。

1
2
3
4
5
6
7
8
// source:transports/curl.c

const yar_transport_t yar_transport_curl = {
"curl",
php_yar_curl_init,
php_yar_curl_destroy,
&yar_transport_curl_multi
};

curl 是传输方式的名称,php_yar_curl_init 和 php_yar_curl_destroy 是 init 和 destroy 这两个函数指针的实现,yar_transport_curl_multi 用来实现并行调用。

先来看看 php_yar_curl_init 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// source:transports/curl.c

yar_transport_interface_t *php_yar_curl_init() {
yar_curl_data_t *data;
yar_transport_interface_t *self;

// 分配 yar_transport_interface_t 结构体及数据的内存
self = ecalloc(1, sizeof(yar_transport_interface_t));
self->data = data = ecalloc(1, sizeof(yar_curl_data_t));

// 设置 Yar 的 HTTP 头部信息
/* snprintf(content_type, sizeof(content_type), "Content-Type: %s", YAR_G(content_type)); */
data->headers = curl_slist_append(data->headers, "User-Agent: PHP Yar RPC-" PHP_YAR_VERSION);
data->headers = curl_slist_append(data->headers, "Expect:");

// 设置各个函数指针对应的函数实现
self->open = php_yar_curl_open;
self->send = php_yar_curl_send;
self->exec = php_yar_curl_exec;
self->setopt = php_yar_curl_setopt;
self->calldata = php_yar_curl_set_calldata;
self->close = php_yar_curl_close;

// 分配响应数据及请求数据的内存,默认为 1M
smart_str_alloc((&data->buf), YAR_PACKAGER_BUFFER_SIZE /* 1M */, 0);
smart_str_alloc((&data->postfield), YAR_PACKAGER_BUFFER_SIZE /* 1M */, 0);

// 返回 yar_transport_interface_t 结构体的指针
return self;
}

由于没有资源需要在 destroy 的时候进行释放,所以 php_yar_curl_destroy 是一个空函数。

1
2
3
4
// source:transports/curl.c

void php_yar_curl_destroy(yar_transport_interface_t *self) {
}

yar_transport_interface_t

接下来看看 curl 中 yar_transport_interface_t 相关的实现。

首先是 php_yar_curl_open 函数,以下是该函数的主要逻辑:

  • 如果开启了连接持久化,则尝试从持久化的连接池中找到未使用的连接。
  • 没有找到符合条件的连接或者未开启连接持久化,则直接创建一个新的 curl 实例。
  • 解析请求地址,并将请求信息及 curl 实例保存到 data 中,方便后续在其它函数中使用。
  • 如果开启了连接持久化,则设置 keep-alive 及 keep-alive 有效时间的头部信息。
  • 设置自定义的头部信息及域名。
  • 设置 curl 收到响应时的回调函数及其它 curl 选项。
  • 设置 curl 请求地址。

打开连接之后,调用 php_yar_curl_send 函数对请求数据进行编码、组装等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// source:transports/curl.c

int php_yar_curl_send(yar_transport_interface_t* self, yar_request_t *request, char **msg) {
yar_header_t header = {0};
yar_curl_data_t *data = (yar_curl_data_t *)self->data;
zend_string *payload;

// 对请求数据进行编码
if (!(payload = php_yar_request_pack(request, msg))) {
return 0;
}

// 通过编码后的数据初始化 header 结构体
php_yar_protocol_render(&header, request->id, data->host->user, data->host->pass, ZSTR_LEN(payload), 0);

// 将 header 及 payload 按照顺序追加到请求数据的 buf 中
smart_str_appendl(&data->postfield, (char *)&header, sizeof(yar_header_t));
smart_str_appendl(&data->postfield, ZSTR_VAL(payload), ZSTR_LEN(payload));
zend_string_release(payload);

return 1;
}

数据准备就绪后,就可以调用 php_yar_curl_exec 函数执行请求,将数据发送出去并解析响应结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// source:transports/curl.c

yar_response_t *php_yar_curl_exec(yar_transport_interface_t* self, yar_request_t *request) {
// ...

// 设置请求数据及大小
curl_easy_setopt(data->cp, CURLOPT_POSTFIELDS, ZSTR_VAL(data->postfield.s));
curl_easy_setopt(data->cp, CURLOPT_POSTFIELDSIZE, ZSTR_LEN(data->postfield.s));

// 设置超时时间等选项
if (IS_ARRAY == Z_TYPE(request->options)) {
zval *pzval;
if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_TIMEOUT))) {
convert_to_long_ex(pzval);
self->setopt(self, YAR_OPT_TIMEOUT, (long *)&Z_LVAL_P(pzval), NULL);
}
if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_CONNECT_TIMEOUT))) {
convert_to_long_ex(pzval);
self->setopt(self, YAR_OPT_CONNECT_TIMEOUT, (long *)&Z_LVAL_P(pzval), NULL);
}
if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_PROXY))) {
convert_to_string_ex(pzval);
self->setopt(self, YAR_OPT_PROXY, (char *)&Z_STRVAL_P(pzval), NULL);
}
}

response = php_yar_response_instance();

// 执行请求
ret = curl_easy_perform(data->cp);
if (ret != CURLE_OK) {
// 请求失败则将失败信息保存到 response 中并返回
len = spprintf(&msg, 0, "curl exec failed '%s'", curl_easy_strerror(ret));
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, msg, len);
efree(msg);
return response;
} else {
long http_code;
// 检查响应状态码是否等于 200
if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK
&& http_code != 200) {
len = spprintf(&msg, 0, "server responsed non-200 code '%ld'", http_code);
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, msg, len);
efree(msg);
return response;
}
}

if (data->buf.s) {
// ...

// 从响应内容中解析出 Yar 协议的头部信息
if (!(header = php_yar_protocol_parse(payload))) {
php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
return response;
}

// 跳过 Yar 协议的头部信息后的内容,就是真正的响应结果
payload += sizeof(yar_header_t);
payload_len -= sizeof(yar_header_t);

// 通过 payload 中的编码方式对 payload 进行解码
if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &ret))) {
php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
efree(msg);
return response;
}

// 将解码后的数据存储到 response 中
php_yar_response_map_retval(response, retval);

zval_ptr_dtor(retval);
} else {
// 响应内容为空,设置错误信息
php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
}

return response;
}

得到响应结果后,调用 php_yar_curl_close 关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// source:transports/curl.c

void php_yar_curl_close(yar_transport_interface_t* self) {
yar_curl_data_t *data = (yar_curl_data_t *)self->data;

if (!data) {
return;
}

if (data->cp) {
if (!data->persistent) {
// 没有开启连接持久化时,清理 curl 连接
curl_easy_cleanup(data->cp);
} else {
// 否则,设置连接为未使用状态
data->plink->in_use = 0;
}
}

// 释放 data 中的相关资源
if (data->host) {
php_url_free(data->host);
}

smart_str_free(&data->buf);
smart_str_free(&data->postfield);
curl_slist_free_all(data->headers);

efree(data);
efree(self);

return;
}

总结

这次没有像上一篇文章那样,将传输模块的生命周期各个阶段都写出来,这是因为传输模块在 PHP 生命周期中的几个阶段做的事情跟编码模块差不多,所以就省略掉了。

通过本文可以看出来,Yar 中的 HTTP 传输方式,实际上是对 curl 的一层封装,我写了一个 curl 的小示例,供大家参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <stdio.h>
#include <curl/curl.h>
#include <curl/easy.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

#define DEFAULT_BUF_SIZE 1024

typedef struct curl_data_s {
CURL *cp;
char *buf;
uint32_t len;
uint32_t alloc;
} curl_data_t;

size_t curl_buf_writer(char *ptr, size_t size, size_t nmemb, void *ctx) {
curl_data_t *data = (curl_data_t *) ctx;
size_t len = size * nmemb;
size_t remain = data->alloc - data->len;

if (len > remain) {
data->alloc += (len - remain) + DEFAULT_BUF_SIZE;
data->buf = realloc(data->buf, data->alloc);
printf("writer, realloc size: %lu\n", (len - remain) + DEFAULT_BUF_SIZE);
}

printf("write, len: %zu\n", len);

memcpy(data->buf + data->len, ptr, len);
data->len += len;

return len;
}

int main() {
curl_data_t data;
char *address = "https://her-cat.com";
CURL *cp = curl_easy_init();

data.cp = cp;
data.buf = malloc(DEFAULT_BUF_SIZE);
data.len = 0;
data.alloc = DEFAULT_BUF_SIZE;

curl_easy_setopt(cp, CURLOPT_POST, 0);
curl_easy_setopt(cp, CURLOPT_URL, address);
curl_easy_setopt(cp, CURLOPT_SSL_VERIFYHOST, 0);
curl_easy_setopt(cp, CURLOPT_SSL_VERIFYPEER, 0);
curl_easy_setopt(cp, CURLOPT_WRITEDATA, &data);
curl_easy_setopt(cp, CURLOPT_WRITEFUNCTION, curl_buf_writer);

CURLcode ret = curl_easy_perform(cp);
if (ret != CURLE_OK) {
printf("curl perform failed, reason: %s\n", curl_easy_strerror(ret));
return 0;
}

long http_code;
curl_easy_getinfo(cp, CURLINFO_RESPONSE_CODE, &http_code);
curl_easy_cleanup(cp);
curl_global_cleanup();

printf("http code: %ld\n", http_code);
printf("buf len: %u\n", data.len);
printf("buf alloc: %u\n", data.alloc);
printf("response: \n%s\n", data.buf);

return 0;
}