Yar 源码阅读笔记:数据传输模块
前言
在前面几篇文章中,更多的是在研究 Yar 传输的内容,比如协议的格式是什么样的、如何对数据进行编码等等。
今天这篇文章,主要介绍 Yar 编码模块的结构体定义,以及 HTTP 传输方式的实现,更深入的了解 Yar 的协议数据是如何被发送出去的。
传输模块结构体
在 RPC 通信协议 中有提到过,Yar 支持 HTTP 和 TCP 两种数据传输方式,前者使用 curl,后者使用 socket。
yar_transport_t
我们可以将 yar_transport_t 结构体看作是同步传输器的工厂,用于创建和释放同步传输器的实例。
// source:yar_transport.h
// 同步传输器的工厂
typedef struct _yar_transport {
const char *name;
struct _yar_transport_interface * (*init)();
void (*destroy)(yar_transport_interface_t *self);
yar_transport_multi_t *multi;
} yar_transport_t;
结构体字段说明:
- name:传输方式的名称,比如 “curl”。
- init:函数指针,创建同步传输器实例并初始化相关配置,函数的返回值是结构体 _yar_transport_interface 的指针。
- destroy:函数指针,释放某个同步传输器的资源,函数的参数是结构体 _yar_transport_interface 的指针。
- multi:用于实现并行调用的结构体指针,这部分会在并行调用进行详解。
yar_transport_interface_t
在 yar_transport_t 结构体中,只定义了 init 和 destroy 这两个函数指针,用于对同步传输器的创建和释放。而真正用于传输时使用的相关函数,则放在了 yar_transport_interface_t 结构体中。
// source:yar_transport.h
// 同步传输器
typedef struct _yar_transport_interface {
void *data;
int (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg);
int (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg);
struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request);
int (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition);
int (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata);
void (*close)(struct _yar_transport_interface *self);
} yar_transport_interface_t;
结构体说明:
- data:存储传输相关的数据指针,比如 curl 是 yar_curl_data_t 结构体,socket 是 yar_socket_data_t 结构体。
- open:函数指针,打开/创建一个连接,并初始化相关默认值。
- send:函数指针,对请求数据进行编码、组装等操作,并将处理后的数据保存到连接中。
- exec:函数指针,执行请求,将连接中的数据发送出去。
- setopt:函数指针,设置连接相关的选项,比如编码方式、超时时间。
- calldata:函数指针,设置回调时的数据,在并行调用时才会用到。
- close:函数指针:关闭连接并释放相关的资源。
HTTP 传输方式
Yar 支持的传输方式都放在了 transports 目录下,因为 HTTP 是 Yar 默认的数据传输方式,并且服务端也只实现了 HTTP 这种方式,所以我们这里用 HTTP 进行举例。
用 c 语言实现 HTTP 请求的类库有很多,Yar 使用的是 curl,所以在 transports 目录下的文件命名是 curl.c。
yar_transport_t
根据前面传输模块的介绍,首先需要实现 yar_transport_t 结构体。
// source:transports/curl.c
const yar_transport_t yar_transport_curl = {
"curl",
php_yar_curl_init,
php_yar_curl_destroy,
&yar_transport_curl_multi
};
curl 是传输方式的名称,php_yar_curl_init 和 php_yar_curl_destroy 是 init 和 destroy 这两个函数指针的实现,yar_transport_curl_multi 用来实现并行调用。
先来看看 php_yar_curl_init 函数。
// source:transports/curl.c
yar_transport_interface_t *php_yar_curl_init() {
yar_curl_data_t *data;
yar_transport_interface_t *self;
// 分配 yar_transport_interface_t 结构体及数据的内存
self = ecalloc(1, sizeof(yar_transport_interface_t));
self->data = data = ecalloc(1, sizeof(yar_curl_data_t));
// 设置 Yar 的 HTTP 头部信息
/* snprintf(content_type, sizeof(content_type), "Content-Type: %s", YAR_G(content_type)); */
data->headers = curl_slist_append(data->headers, "User-Agent: PHP Yar RPC-" PHP_YAR_VERSION);
data->headers = curl_slist_append(data->headers, "Expect:");
// 设置各个函数指针对应的函数实现
self->open = php_yar_curl_open;
self->send = php_yar_curl_send;
self->exec = php_yar_curl_exec;
self->setopt = php_yar_curl_setopt;
self->calldata = php_yar_curl_set_calldata;
self->close = php_yar_curl_close;
// 分配响应数据及请求数据的内存,默认为 1M
smart_str_alloc((&data->buf), YAR_PACKAGER_BUFFER_SIZE /* 1M */, 0);
smart_str_alloc((&data->postfield), YAR_PACKAGER_BUFFER_SIZE /* 1M */, 0);
// 返回 yar_transport_interface_t 结构体的指针
return self;
}
由于没有资源需要在 destroy 的时候进行释放,所以 php_yar_curl_destroy 是一个空函数。
// source:transports/curl.c
void php_yar_curl_destroy(yar_transport_interface_t *self) {
}
yar_transport_interface_t
接下来看看 curl 中 yar_transport_interface_t 相关的实现。
首先是 php_yar_curl_open 函数,以下是该函数的主要逻辑:
- 如果开启了连接持久化,则尝试从持久化的连接池中找到未使用的连接。
- 没有找到符合条件的连接或者未开启连接持久化,则直接创建一个新的 curl 实例。
- 解析请求地址,并将请求信息及 curl 实例保存到 data 中,方便后续在其它函数中使用。
- 如果开启了连接持久化,则设置 keep-alive 及 keep-alive 有效时间的头部信息。
- 设置自定义的头部信息及域名。
- 设置 curl 收到响应时的回调函数及其它 curl 选项。
- 设置 curl 请求地址。
打开连接之后,调用 php_yar_curl_send 函数对请求数据进行编码、组装等操作。
// source:transports/curl.c
int php_yar_curl_send(yar_transport_interface_t* self, yar_request_t *request, char **msg) {
yar_header_t header = {0};
yar_curl_data_t *data = (yar_curl_data_t *)self->data;
zend_string *payload;
// 对请求数据进行编码
if (!(payload = php_yar_request_pack(request, msg))) {
return 0;
}
// 通过编码后的数据初始化 header 结构体
php_yar_protocol_render(&header, request->id, data->host->user, data->host->pass, ZSTR_LEN(payload), 0);
// 将 header 及 payload 按照顺序追加到请求数据的 buf 中
smart_str_appendl(&data->postfield, (char *)&header, sizeof(yar_header_t));
smart_str_appendl(&data->postfield, ZSTR_VAL(payload), ZSTR_LEN(payload));
zend_string_release(payload);
return 1;
}
数据准备就绪后,就可以调用 php_yar_curl_exec 函数执行请求,将数据发送出去并解析响应结果。
// source:transports/curl.c
yar_response_t *php_yar_curl_exec(yar_transport_interface_t* self, yar_request_t *request) {
// ...
// 设置请求数据及大小
curl_easy_setopt(data->cp, CURLOPT_POSTFIELDS, ZSTR_VAL(data->postfield.s));
curl_easy_setopt(data->cp, CURLOPT_POSTFIELDSIZE, ZSTR_LEN(data->postfield.s));
// 设置超时时间等选项
if (IS_ARRAY == Z_TYPE(request->options)) {
zval *pzval;
if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_TIMEOUT))) {
convert_to_long_ex(pzval);
self->setopt(self, YAR_OPT_TIMEOUT, (long *)&Z_LVAL_P(pzval), NULL);
}
if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_CONNECT_TIMEOUT))) {
convert_to_long_ex(pzval);
self->setopt(self, YAR_OPT_CONNECT_TIMEOUT, (long *)&Z_LVAL_P(pzval), NULL);
}
if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_PROXY))) {
convert_to_string_ex(pzval);
self->setopt(self, YAR_OPT_PROXY, (char *)&Z_STRVAL_P(pzval), NULL);
}
}
response = php_yar_response_instance();
// 执行请求
ret = curl_easy_perform(data->cp);
if (ret != CURLE_OK) {
// 请求失败则将失败信息保存到 response 中并返回
len = spprintf(&msg, 0, "curl exec failed '%s'", curl_easy_strerror(ret));
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, msg, len);
efree(msg);
return response;
} else {
long http_code;
// 检查响应状态码是否等于 200
if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK
&& http_code != 200) {
len = spprintf(&msg, 0, "server responsed non-200 code '%ld'", http_code);
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, msg, len);
efree(msg);
return response;
}
}
if (data->buf.s) {
// ...
// 从响应内容中解析出 Yar 协议的头部信息
if (!(header = php_yar_protocol_parse(payload))) {
php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
return response;
}
// 跳过 Yar 协议的头部信息后的内容,就是真正的响应结果
payload += sizeof(yar_header_t);
payload_len -= sizeof(yar_header_t);
// 通过 payload 中的编码方式对 payload 进行解码
if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &ret))) {
php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
efree(msg);
return response;
}
// 将解码后的数据存储到 response 中
php_yar_response_map_retval(response, retval);
zval_ptr_dtor(retval);
} else {
// 响应内容为空,设置错误信息
php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
}
return response;
}
得到响应结果后,调用 php_yar_curl_close 关闭连接。
// source:transports/curl.c
void php_yar_curl_close(yar_transport_interface_t* self) {
yar_curl_data_t *data = (yar_curl_data_t *)self->data;
if (!data) {
return;
}
if (data->cp) {
if (!data->persistent) {
// 没有开启连接持久化时,清理 curl 连接
curl_easy_cleanup(data->cp);
} else {
// 否则,设置连接为未使用状态
data->plink->in_use = 0;
}
}
// 释放 data 中的相关资源
if (data->host) {
php_url_free(data->host);
}
smart_str_free(&data->buf);
smart_str_free(&data->postfield);
curl_slist_free_all(data->headers);
efree(data);
efree(self);
return;
}
总结
这次没有像上一篇文章那样,将传输模块的生命周期各个阶段都写出来,这是因为传输模块在 PHP 生命周期中的几个阶段做的事情跟编码模块差不多,所以就省略掉了。
通过本文可以看出来,Yar 中的 HTTP 传输方式,实际上是对 curl 的一层封装,我写了一个 curl 的小示例,供大家参考。
#include <stdio.h>
#include <curl/curl.h>
#include <curl/easy.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#define DEFAULT_BUF_SIZE 1024
typedef struct curl_data_s {
CURL *cp;
char *buf;
uint32_t len;
uint32_t alloc;
} curl_data_t;
size_t curl_buf_writer(char *ptr, size_t size, size_t nmemb, void *ctx) {
curl_data_t *data = (curl_data_t *) ctx;
size_t len = size * nmemb;
size_t remain = data->alloc - data->len;
if (len > remain) {
data->alloc += (len - remain) + DEFAULT_BUF_SIZE;
data->buf = realloc(data->buf, data->alloc);
printf("writer, realloc size: %lu\n", (len - remain) + DEFAULT_BUF_SIZE);
}
printf("write, len: %zu\n", len);
memcpy(data->buf + data->len, ptr, len);
data->len += len;
return len;
}
int main() {
curl_data_t data;
char *address = "https://her-cat.com";
CURL *cp = curl_easy_init();
data.cp = cp;
data.buf = malloc(DEFAULT_BUF_SIZE);
data.len = 0;
data.alloc = DEFAULT_BUF_SIZE;
curl_easy_setopt(cp, CURLOPT_POST, 0);
curl_easy_setopt(cp, CURLOPT_URL, address);
curl_easy_setopt(cp, CURLOPT_SSL_VERIFYHOST, 0);
curl_easy_setopt(cp, CURLOPT_SSL_VERIFYPEER, 0);
curl_easy_setopt(cp, CURLOPT_WRITEDATA, &data);
curl_easy_setopt(cp, CURLOPT_WRITEFUNCTION, curl_buf_writer);
CURLcode ret = curl_easy_perform(cp);
if (ret != CURLE_OK) {
printf("curl perform failed, reason: %s\n", curl_easy_strerror(ret));
return 0;
}
long http_code;
curl_easy_getinfo(cp, CURLINFO_RESPONSE_CODE, &http_code);
curl_easy_cleanup(cp);
curl_global_cleanup();
printf("http code: %ld\n", http_code);
printf("buf len: %u\n", data.len);
printf("buf alloc: %u\n", data.alloc);
printf("response: \n%s\n", data.buf);
return 0;
}