Yar 源码阅读笔记:数据传输模块

2022-01-06
5分钟阅读时长

前言

在前面几篇文章中,更多的是在研究 Yar 传输的内容,比如协议的格式是什么样的、如何对数据进行编码等等。

今天这篇文章,主要介绍 Yar 编码模块的结构体定义,以及 HTTP 传输方式的实现,更深入的了解 Yar 的协议数据是如何被发送出去的。

传输模块结构体

RPC 通信协议 中有提到过,Yar 支持 HTTP 和 TCP 两种数据传输方式,前者使用 curl,后者使用 socket。

yar_transport_t

我们可以将 yar_transport_t 结构体看作是同步传输器的工厂,用于创建和释放同步传输器的实例。

// source:yar_transport.h

// 同步传输器的工厂
typedef struct _yar_transport {
    const char *name;
    struct _yar_transport_interface * (*init)();
    void (*destroy)(yar_transport_interface_t *self);
    yar_transport_multi_t *multi;
} yar_transport_t;

结构体字段说明:

  • name:传输方式的名称,比如 “curl”。
  • init:函数指针,创建同步传输器实例并初始化相关配置,函数的返回值是结构体 _yar_transport_interface 的指针。
  • destroy:函数指针,释放某个同步传输器的资源,函数的参数是结构体 _yar_transport_interface 的指针。
  • multi:用于实现并行调用的结构体指针,这部分会在并行调用进行详解。

yar_transport_interface_t

在 yar_transport_t 结构体中,只定义了 init 和 destroy 这两个函数指针,用于对同步传输器的创建和释放。而真正用于传输时使用的相关函数,则放在了 yar_transport_interface_t 结构体中。

// source:yar_transport.h

// 同步传输器
typedef struct _yar_transport_interface {
    void *data;
    int  (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg);
    int  (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg);
    struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request);
    int  (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition);
    int  (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata);
    void (*close)(struct _yar_transport_interface *self);
} yar_transport_interface_t;

结构体说明:

  • data:存储传输相关的数据指针,比如 curl 是 yar_curl_data_t 结构体,socket 是 yar_socket_data_t 结构体。
  • open:函数指针,打开/创建一个连接,并初始化相关默认值。
  • send:函数指针,对请求数据进行编码、组装等操作,并将处理后的数据保存到连接中。
  • exec:函数指针,执行请求,将连接中的数据发送出去。
  • setopt:函数指针,设置连接相关的选项,比如编码方式、超时时间。
  • calldata:函数指针,设置回调时的数据,在并行调用时才会用到。
  • close:函数指针:关闭连接并释放相关的资源。

HTTP 传输方式

Yar 支持的传输方式都放在了 transports 目录下,因为 HTTP 是 Yar 默认的数据传输方式,并且服务端也只实现了 HTTP 这种方式,所以我们这里用 HTTP 进行举例。

用 c 语言实现 HTTP 请求的类库有很多,Yar 使用的是 curl,所以在 transports 目录下的文件命名是 curl.c。

yar_transport_t

根据前面传输模块的介绍,首先需要实现 yar_transport_t 结构体。

// source:transports/curl.c

const yar_transport_t yar_transport_curl = {
    "curl",
    php_yar_curl_init,
    php_yar_curl_destroy,
    &yar_transport_curl_multi
};

curl 是传输方式的名称,php_yar_curl_init 和 php_yar_curl_destroy 是 init 和 destroy 这两个函数指针的实现,yar_transport_curl_multi 用来实现并行调用。

先来看看 php_yar_curl_init 函数。

// source:transports/curl.c

yar_transport_interface_t *php_yar_curl_init() {
    yar_curl_data_t *data;
    yar_transport_interface_t *self;

    // 分配 yar_transport_interface_t 结构体及数据的内存
    self = ecalloc(1, sizeof(yar_transport_interface_t));
    self->data = data = ecalloc(1, sizeof(yar_curl_data_t));

    // 设置 Yar 的 HTTP 头部信息
    /* snprintf(content_type, sizeof(content_type), "Content-Type: %s", YAR_G(content_type)); */
    data->headers = curl_slist_append(data->headers, "User-Agent: PHP Yar RPC-" PHP_YAR_VERSION);
    data->headers = curl_slist_append(data->headers, "Expect:");

    // 设置各个函数指针对应的函数实现
    self->open       = php_yar_curl_open;
    self->send       = php_yar_curl_send;
    self->exec       = php_yar_curl_exec;
    self->setopt     = php_yar_curl_setopt;
    self->calldata   = php_yar_curl_set_calldata;
    self->close      = php_yar_curl_close;

    // 分配响应数据及请求数据的内存,默认为 1M
    smart_str_alloc((&data->buf), YAR_PACKAGER_BUFFER_SIZE /* 1M */, 0);
    smart_str_alloc((&data->postfield), YAR_PACKAGER_BUFFER_SIZE /* 1M */, 0);

    // 返回 yar_transport_interface_t 结构体的指针
    return  self;
}

由于没有资源需要在 destroy 的时候进行释放,所以 php_yar_curl_destroy 是一个空函数。

// source:transports/curl.c

void php_yar_curl_destroy(yar_transport_interface_t *self) {
} 

yar_transport_interface_t

接下来看看 curl 中 yar_transport_interface_t 相关的实现。

首先是 php_yar_curl_open 函数,以下是该函数的主要逻辑:

  • 如果开启了连接持久化,则尝试从持久化的连接池中找到未使用的连接。
  • 没有找到符合条件的连接或者未开启连接持久化,则直接创建一个新的 curl 实例。
  • 解析请求地址,并将请求信息及 curl 实例保存到 data 中,方便后续在其它函数中使用。
  • 如果开启了连接持久化,则设置 keep-alive 及 keep-alive 有效时间的头部信息。
  • 设置自定义的头部信息及域名。
  • 设置 curl 收到响应时的回调函数及其它 curl 选项。
  • 设置 curl 请求地址。

打开连接之后,调用 php_yar_curl_send 函数对请求数据进行编码、组装等操作。

// source:transports/curl.c

int php_yar_curl_send(yar_transport_interface_t* self, yar_request_t *request, char **msg) {
    yar_header_t header = {0};
    yar_curl_data_t *data = (yar_curl_data_t *)self->data;
    zend_string *payload;

    // 对请求数据进行编码
    if (!(payload = php_yar_request_pack(request, msg))) {
        return 0;
    }

    // 通过编码后的数据初始化 header 结构体
    php_yar_protocol_render(&header, request->id, data->host->user, data->host->pass, ZSTR_LEN(payload), 0);

    // 将 header 及 payload 按照顺序追加到请求数据的 buf 中
    smart_str_appendl(&data->postfield, (char *)&header, sizeof(yar_header_t));
    smart_str_appendl(&data->postfield, ZSTR_VAL(payload), ZSTR_LEN(payload));
    zend_string_release(payload);

    return 1;
}

数据准备就绪后,就可以调用 php_yar_curl_exec 函数执行请求,将数据发送出去并解析响应结果。

// source:transports/curl.c

yar_response_t *php_yar_curl_exec(yar_transport_interface_t* self, yar_request_t *request) {
    // ...

    // 设置请求数据及大小
    curl_easy_setopt(data->cp, CURLOPT_POSTFIELDS, ZSTR_VAL(data->postfield.s));
    curl_easy_setopt(data->cp, CURLOPT_POSTFIELDSIZE, ZSTR_LEN(data->postfield.s));

    // 设置超时时间等选项
    if (IS_ARRAY == Z_TYPE(request->options)) {
        zval *pzval;
        if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_TIMEOUT))) {
            convert_to_long_ex(pzval);
            self->setopt(self, YAR_OPT_TIMEOUT, (long *)&Z_LVAL_P(pzval), NULL);
        }
        if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_CONNECT_TIMEOUT))) {
            convert_to_long_ex(pzval);
            self->setopt(self, YAR_OPT_CONNECT_TIMEOUT, (long *)&Z_LVAL_P(pzval), NULL);
        }
        if ((pzval = zend_hash_index_find(Z_ARRVAL(request->options), YAR_OPT_PROXY))) {
            convert_to_string_ex(pzval);
            self->setopt(self, YAR_OPT_PROXY, (char *)&Z_STRVAL_P(pzval), NULL);
        }
    }

    response = php_yar_response_instance();

    // 执行请求
    ret = curl_easy_perform(data->cp);
    if (ret != CURLE_OK) {
        // 请求失败则将失败信息保存到 response 中并返回
        len = spprintf(&msg, 0, "curl exec failed '%s'", curl_easy_strerror(ret));
        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, msg, len);
        efree(msg);
        return response;
    } else {
        long http_code;
        // 检查响应状态码是否等于 200
        if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK 
                && http_code != 200) {
            len = spprintf(&msg, 0, "server responsed non-200 code '%ld'", http_code);
            php_yar_response_set_error(response, YAR_ERR_TRANSPORT, msg, len);
            efree(msg);
            return response;
        }
    }

    if (data->buf.s) {
        // ...

        // 从响应内容中解析出 Yar 协议的头部信息
        if (!(header = php_yar_protocol_parse(payload))) {
            php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
            return response;
        }

        // 跳过 Yar 协议的头部信息后的内容,就是真正的响应结果
        payload += sizeof(yar_header_t);
        payload_len -= sizeof(yar_header_t);

        // 通过 payload 中的编码方式对 payload 进行解码
        if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &ret))) {
            php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
            efree(msg);
            return response;
        }

        // 将解码后的数据存储到 response 中
        php_yar_response_map_retval(response, retval);

        zval_ptr_dtor(retval);
    } else {
        // 响应内容为空,设置错误信息
        php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
    }    

    return response;
}

得到响应结果后,调用 php_yar_curl_close 关闭连接。

// source:transports/curl.c

void php_yar_curl_close(yar_transport_interface_t* self) {
    yar_curl_data_t *data = (yar_curl_data_t *)self->data;

    if (!data) {
        return;
    }

    if (data->cp) {
        if (!data->persistent) {
            // 没有开启连接持久化时,清理 curl 连接
            curl_easy_cleanup(data->cp);
        } else {
            // 否则,设置连接为未使用状态
            data->plink->in_use = 0;
        }
    }

    // 释放 data 中的相关资源
    if (data->host) {
        php_url_free(data->host);
    }

    smart_str_free(&data->buf);
    smart_str_free(&data->postfield);
    curl_slist_free_all(data->headers);

    efree(data);
    efree(self);

    return;
}

总结

这次没有像上一篇文章那样,将传输模块的生命周期各个阶段都写出来,这是因为传输模块在 PHP 生命周期中的几个阶段做的事情跟编码模块差不多,所以就省略掉了。

通过本文可以看出来,Yar 中的 HTTP 传输方式,实际上是对 curl 的一层封装,我写了一个 curl 的小示例,供大家参考。

#include <stdio.h>
#include <curl/curl.h>
#include <curl/easy.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

#define DEFAULT_BUF_SIZE 1024

typedef struct curl_data_s {
    CURL *cp;
    char *buf;
    uint32_t len;
    uint32_t alloc;
} curl_data_t;

size_t curl_buf_writer(char *ptr, size_t size, size_t nmemb, void *ctx) {
    curl_data_t *data = (curl_data_t *) ctx;
    size_t len = size * nmemb;
    size_t remain = data->alloc - data->len;

    if (len > remain) {
        data->alloc += (len - remain) + DEFAULT_BUF_SIZE;
        data->buf = realloc(data->buf, data->alloc);
        printf("writer, realloc size: %lu\n", (len - remain) + DEFAULT_BUF_SIZE);
    }

    printf("write, len: %zu\n", len);

    memcpy(data->buf + data->len, ptr, len);
    data->len += len;

    return len;
}

int main() {
    curl_data_t data;
    char *address = "https://her-cat.com";
    CURL *cp = curl_easy_init();

    data.cp = cp;
    data.buf = malloc(DEFAULT_BUF_SIZE);
    data.len = 0;
    data.alloc = DEFAULT_BUF_SIZE;

    curl_easy_setopt(cp, CURLOPT_POST, 0);
    curl_easy_setopt(cp, CURLOPT_URL, address);
    curl_easy_setopt(cp, CURLOPT_SSL_VERIFYHOST, 0);
    curl_easy_setopt(cp, CURLOPT_SSL_VERIFYPEER, 0);
    curl_easy_setopt(cp, CURLOPT_WRITEDATA, &data);
    curl_easy_setopt(cp, CURLOPT_WRITEFUNCTION, curl_buf_writer);

    CURLcode ret = curl_easy_perform(cp);
    if (ret != CURLE_OK) {
        printf("curl perform failed, reason: %s\n", curl_easy_strerror(ret));
        return 0;
    }

    long http_code;
    curl_easy_getinfo(cp, CURLINFO_RESPONSE_CODE, &http_code);
    curl_easy_cleanup(cp);
    curl_global_cleanup();

    printf("http code: %ld\n", http_code);
    printf("buf len: %u\n", data.len);
    printf("buf alloc: %u\n", data.alloc);
    printf("response: \n%s\n", data.buf);

    return 0;
}
本文作者:她和她的猫
本文地址https://her-cat.com/posts/2022/01/06/yar-internals-transport/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!