Yar 源码阅读笔记:客户端的并行调用

2022-01-14
10分钟阅读时长

前言

在上一篇文章中,介绍了客户端同步调用的具体实现的,主要还是通过调用传输模块的相关函数,完成发送和接收远程调用的数据。

在调用多个远程方法时,同步调用是以串行的方式执行的,导致运行效率比较低,所以需要使用并行调用来提高调用多个远程方法的运行效率,减少整体运行的时间。

并行传输模块

yar_transport_multi_t

在数据传输模块中,我们提到了 yar_transport_multi_t *multi 就是用来实现并行调用的,先来看看该结构体的定义。

// source:yar_transport.h

// 并行传输器的工厂
typedef struct _yar_transport_multi {
    struct _yar_transport_multi_interface * (*init)();
} yar_transport_multi_t;

可以将 yar_transport_multi_t 看作是并行传输器的工厂,使用 init 函数创建并行传输器的实例,也就是为 _yar_transport_multi_interface 结构体分配内存,并初始化相关配置。

yar_transport_multi_interface_t

// source:yar_transport.h

// 并行传输器
typedef struct _yar_transport_multi_interface {
    void *data;
    int (*add)(struct _yar_transport_multi_interface *self,     yar_transport_interface_t *cp);
    int (*exec)(struct _yar_transport_multi_interface *self,     yar_concurrent_client_callback *callback);
    void (*close)(struct _yar_transport_multi_interface *self);
} yar_transport_multi_interface_t;

其实 yar_transport_multi_interface_t 结构体和 yar_transport_interface_t 结构体类似,都是用来定义在传输时使用的函数,下面是字段说明:

  • data:存储并行传输器的数据指针,指向 yar_curl_multi_data_t 结构体。
  • add:函数指针,将一个同步传输器(yar_transport_interface_t)实例中的数据存储到并行传输器中。
  • exec:函数指针,执行请求,将所有的同步传输器实例中的数据发送出去。
  • close:函数指针,释放相关资源。

从上面的说明可以看出来,并行传输器实际上是在管理多个同步传输器实例,先调用 add 函数将同步传输器实例存储到并行传输器中,然后通过 exec 函数将这些同步传输器实例中的数据发送出去,并对响应结果进行处理,最后调用 close 函数清理相关资源。

yar_curl_multi_data_t

虽然 yar_curl_multi_data_t 结构体的字段比较少,但是通过这个结构体,可以知道并行传输器是如何存储同步传输器实例的数据的。

// source:transports/curl.c

typedef struct _yar_curl_multi_data_t {
    CURLM *cm;
    yar_transport_interface_t *chs;
} yar_curl_multi_data_t;

cm 字段用来存储 curl 批处理实例的指针,chs 则是同步传输器的指针,同步传输器中 data 字段指向的结构体中有一个 next 字段,用来指向下一个同步传输器:

// source:transports/curl.c

typedef struct _yar_curl_data_t {
    // ...
    yar_transport_interface_t *next;
} yar_curl_data_t;

看到这里你应该就明白了,Yar 通过单链表的结构将所有的同步传输器实例串起来, yar_curl_multi_data_t 中的 chs 字段相当于头指针。

并行客户端

先来看看并行客户端的方法列表及属性。

// source:yar_client.c

    // 方法列表
    zend_function_entry yar_concurrent_client_methods[] = {
        PHP_ME(yar_concurrent_client, call, arginfo_client_async, ZEND_ACC_PUBLIC|ZEND_ACC_STATIC)
        PHP_ME(yar_concurrent_client, loop, arginfo_client_loop, ZEND_ACC_PUBLIC|ZEND_ACC_STATIC)
        PHP_ME(yar_concurrent_client, reset,arginfo_client_void, ZEND_ACC_PUBLIC|ZEND_ACC_STATIC)
        PHP_FE_END
    };

    INIT_CLASS_ENTRY(ce, "Yar_Concurrent_Client", yar_concurrent_client_methods);
    yar_concurrent_client_ce = zend_register_internal_class(&ce);
    // 声明并行客户端的属性
    zend_declare_property_null(yar_concurrent_client_ce, ZEND_STRL("_callstack"), ZEND_ACC_PROTECTED|ZEND_ACC_STATIC);
    zend_declare_property_null(yar_concurrent_client_ce, ZEND_STRL("_callback"), ZEND_ACC_PROTECTED|ZEND_ACC_STATIC);
    zend_declare_property_null(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), ZEND_ACC_PROTECTED|ZEND_ACC_STATIC);
    zend_declare_property_bool(yar_concurrent_client_ce, ZEND_STRL("_start"), 0, ZEND_ACC_PROTECTED|ZEND_ACC_STATIC);

由此我们可以知道并行客户端大概长这样:

class {
    protected static $_callstack = null; // 存储每个调用的数据
    protected static $_callback = null; // 并行客户端的回调函数
    protected static $_error_callback = null; // 并行客户端的异常回调函数
    protected static $_start = false; // 是否已启动
    
    // 调用某个方法
    public static function call($uri, $method, $parameters = null, $callback = null, $error_callback = null, $options = array()) 
    {
        
    }
    
    // 执行调用并等待响应
    public static function loop($callback = null, $error_callback = null) 
    {
        
    }
    
    // 重置并行客户端
    public static function reset()
    {
        
    }
}

Yar 定义了 yar_call_data_t 结构体将 call 方法的参数存起来,当作远程调用的数据。

// source:yar_transport.h

typedef struct _yar_call_data {
    zend_long sequence;
    zend_string *uri;
    zend_string *method;
    zval callback;
    zval ecallback;
    zval parameters;
    zval options;
} yar_call_data_t;

在 PHP 中使用该结构体前,需要在 Yar 启动时向 PHP 注册该资源类型,同时传入析构函数,用于 PHP 释放该资源的时候调用。

YAR_STARTUP_FUNCTION(transport) {
    // ... 

    le_calldata = zend_register_list_destructors_ex(php_yar_calldata_dtor, NULL, "Yar Call Data", module_number);

    return SUCCESS;
}

zend_register_list_destructors_ex 函数会返回一个整数,表示注册的资源类型,在取回资源时需要用到该整数。

并行调用

这次我们使用 gdb 调试的方式,一步步的展示并行调用的过程,在此之前需要编译好 Yar,编译时记得关闭编译优化,并在 php.ini 中配置好扩展,然后启动开篇中的 Yar 服务端示例

Yar_Concurrent_Client::call

客户端的并行调用示例中,先调用了 call 方法,然后调用了 loop 方法,所以我们分别在这两个方法的上打断点。通过上面并行客户端的介绍可以知道,实现 call 方法的函数是 zim_yar_concurrent_client_call,我们在该函数上打断点即可。

$ gdb php
(gdb) b zim_yar_concurrent_client_call
Function "zim_yar_concurrent_client_call" not defined.
Make breakpoint pending on future shared library load? (y or [n]) y
Breakpoint 1 (zim_yar_concurrent_client_call) pending.
(gdb)

开始运行 PHP 并指定运行的 PHP 脚本。

(gdb) r concurrent_client.php
Starting program: /usr/bin/php concurrent_client.php
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".

Breakpoint 1, zim_yar_concurrent_client_call (
    execute_data=0x5555557de9ee <zend_restore_lexical_state+222>,
    return_value=0x555555a2b240 <language_scanner_globals>)
    at /home/vagrant/code/her-cat/yar/yar_client.c:645
645    PHP_METHOD(yar_concurrent_client, call) {
(gdb)

zim_yar_concurrent_client_call 函数作用是将 call 方法的参数组装为 yar_call_data_t 结构体,并存储到并行客户端的 _callstack 数组中,然后返回本次调用的序号。

// source:yar_client.c

PHP_METHOD(yar_concurrent_client, call) {
    zend_string *uri, *method;
    zend_string *name = NULL;
    zval *callstack, item, *status;
    zval *error_callback = NULL, *callback = NULL, *parameters = NULL, *options = NULL;
    yar_call_data_t *entry;

    // 解析 call 方法中的参数
    if (zend_parse_parameters(ZEND_NUM_ARGS(), "SS|a!z!za",
                &uri, &method, &parameters, &callback, &error_callback, &options) == FAILURE) {
        return;
    }

    // 省略一些对参数校验的代码...

    
    // 校验并行客户端的状态
    status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
    if (UNEXPECTED(Z_TYPE_P(status) == IS_TRUE)) {
        php_error_docref(NULL, E_WARNING, "concurrent client has already started");
        RETURN_FALSE;
    }

    // 分配调用数据的内存
    entry = ecalloc(1, sizeof(yar_call_data_t));

    // 将参数赋值到调用数据上
    entry->uri = zend_string_copy(uri);
    entry->method = zend_string_copy(method);

    if (callback && !Z_ISNULL_P(callback)) {
        ZVAL_COPY(&entry->callback, callback);
    }
    if (error_callback && !Z_ISNULL_P(error_callback)) {
        ZVAL_COPY(&entry->ecallback, error_callback);
    }
    if (parameters && IS_ARRAY == Z_TYPE_P(parameters)) {
        ZVAL_COPY(&entry->parameters, parameters);
    }
    if (options && IS_ARRAY == Z_TYPE_P(options)) {
        ZVAL_COPY(&entry->options, options);
    }

    // 初始化调用栈数组
    callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
    if (Z_ISNULL_P(callstack)) {
        zval rv;
        array_init(&rv);
        zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), &rv);
        ZVAL_ARR(callstack, Z_ARRVAL(rv));
        Z_DELREF(rv);
    }

    // 将调用数据注册到 zend 中,并返回 zend_resouce 类型的数据
    ZVAL_RES(&item, zend_register_resource(entry, le_calldata));

    // 设置调用数据的序号
    entry->sequence = zend_hash_num_elements(Z_ARRVAL_P(callstack)) + 1;

    // 将 zend_resource 类型的调用数据存储到 callstack 中
    zend_hash_next_index_insert(Z_ARRVAL_P(callstack), &item);

    RETURN_LONG(entry->sequence);
}

让代码执行到最后一行,并打印 entry 中的数据。

(gdb) u 733
zim_yar_concurrent_client_call (execute_data=0x7ffff54130d0,
    return_value=0x7fffffffaa80)
    at /home/vagrant/code/her-cat/yar/yar_client.c:733
733        RETURN_LONG(entry->sequence);
(gdb) p *entry->uri.val@32
$5 = "http://127.0.0.1:3000/server.php"
(gdb) p *entry->method.val@5
$6 = "login"
(gdb) p entry->sequence
$7 = 1

Yar_Concurrent_Client::loop

先在 loop 方法上打断点,然后让代码执行到断点处。

(gdb) b zim_yar_concurrent_client_loop
Breakpoint 2 at 0x7ffff56e97f7: file /home/vagrant/code/her-cat/yar/yar_client.c, line 751.
(gdb) c
Continuing.

Breakpoint 1, zim_yar_concurrent_client_call (execute_data=0x7ffff54130d0, return_value=0x7fffffffaa80)
    at /home/vagrant/code/her-cat/yar/yar_client.c:645
645    PHP_METHOD(yar_concurrent_client, call) {
(gdb)

在 zim_yar_concurrent_client_loop 函数中,先校验 loop 方法的参数及并行客户端的运行状态,然后从并行客户端中读取 _callstack 数组,调用 php_yar_concurrent_client_handle 函数。

// source:yar_client.c

PHP_METHOD(yar_concurrent_client, loop) {
    zend_string *name = NULL;
    zval *callstack;
    zval *callback = NULL, *error_callback = NULL;
    zval *status;
    unsigned ret = 0;

    // 解析参数
    if (zend_parse_parameters(ZEND_NUM_ARGS(), "|zz", &callback, &error_callback) == FAILURE) {
        return;
    }

    // 判断运行状态
    status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
    if (UNEXPECTED(Z_TYPE_P(status) == IS_TRUE)) {
        php_error_docref(NULL, E_WARNING, "concurrent client has already started");
        RETURN_FALSE;
    }
    
    // 省略一些对参数校验的代码...

    // 读取所有的调用数据
    callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
    if (Z_ISNULL_P(callstack) || zend_hash_num_elements(Z_ARRVAL_P(callstack)) == 0) {
        RETURN_TRUE;
    }

    // 更新为运行中的状态
    ZVAL_BOOL(status, 1);
    ret = php_yar_concurrent_client_handle(callstack);
    ZVAL_BOOL(status, 0);
    RETURN_BOOL(ret);
}

php_yar_concurrent_client_handle

让代码执行到调用 php_yar_concurrent_client_handle 的前一行,然后进入函数内部。

(gdb) u 801
zim_yar_concurrent_client_loop (execute_data=0x7ffff54130d0, return_value=0x7fffffffaa80)
    at /home/vagrant/code/her-cat/yar/yar_client.c:801
801        ZVAL_BOOL(status, 1);
(gdb) s
802        ret = php_yar_concurrent_client_handle(callstack);
(gdb) s
php_yar_concurrent_client_handle (callstack=0x0) at /home/vagrant/code/her-cat/yar/yar_client.c:456
456    int php_yar_concurrent_client_handle(zval *callstack) /* {{{ */ {
(gdb)

php_yar_concurrent_client_handle 函数的主要逻辑:

  • 遍历 callstack 数组
    • 为每一个调用数据创建同步传输器。
    • 分别调用同步传输器中的 open、send、calldata 等函数初始化调用请求。
    • 调用 multi->add 函数将这些同步传输器存储到并行调用器中。
  • 调用 multi->exec 函数将这些请求发送出去。
  • 收到响应结果后调用 php_yar_concurrent_client_callback 函数进行处理。
  • 最后调用 multi->close 函数释放相关资源。
// source:yar_client.c

int php_yar_concurrent_client_handle(zval *callstack) /* {{{ */ {
    char *msg;
    zval *calldata;
    yar_request_t *request;
    const yar_transport_t *factory;
    yar_transport_interface_t *transport;
    yar_transport_multi_interface_t *multi;

    // 获取 curl 传输器工厂实例
    factory = php_yar_transport_get(ZEND_STRL("curl"));
    // 创建并行传输器实例
    multi = factory->multi->init();

    // 遍历 callstack 数组
    ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(callstack), calldata) {
        yar_call_data_t *entry;
        long flags = 0;

        entry = (yar_call_data_t *)zend_fetch_resource(Z_RES_P(calldata), "Yar Call Data", le_calldata);

        if (!entry) {
            continue;
        }

        if (Z_ISUNDEF(entry->parameters)) {
            array_init(&entry->parameters);
        } 

        // 为每一个调用数据创建同步传输器
        transport = factory->init();

        if (!Z_ISUNDEF(entry->options)) {
            zval *flag = php_yar_client_get_opt(&entry->options, YAR_OPT_PERSISTENT);
            if (flag && (Z_TYPE_P(flag) == IS_TRUE || (Z_TYPE_P(flag) == IS_LONG && Z_LVAL_P(flag)))) {
                flags |= YAR_PROTOCOL_PERSISTENT;
            }
        }

        // 创建调用请求
        if (!(request = php_yar_request_instance(entry->method,
                        &entry->parameters, Z_ISUNDEF(entry->options)? NULL: & entry->options))) {
            transport->close(transport);
            factory->destroy(transport);
            return 0;
        }

        // 创建一个连接
        msg = (char*)&entry->options;
        if (!transport->open(transport, entry->uri, flags, &msg)) {
            php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
            transport->close(transport);
            factory->destroy(transport);
            efree(msg);
            return 0;
        }

        // 对请求数据进行编码、组装等操作,并将处理后的数据保存到连接中
        if (!transport->send(transport, request, &msg)) {
            php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
            transport->close(transport);
            factory->destroy(transport);
            efree(msg);
            return 0;
        }
        
        // 将调用数据保存到同步传输器中
        transport->calldata(transport, entry);
        // 将同步传输器存储到并行调用器中。
        multi->add(multi, transport);
        // 释放请求
        php_yar_request_destroy(request);
    } ZEND_HASH_FOREACH_END();

    // 执行所有请求
    if (!multi->exec(multi, php_yar_concurrent_client_callback)) {
        multi->close(multi);
        return 0;
    }

    // 释放资源
    multi->close(multi);
    return 1;
}

我们看下并行传输器中函数指针对应的函数。

(gdb) p *multi
$10 = {data = 0x7ffff5461050, add = 0x7ffff56eea4e <php_yar_curl_multi_add_handle>,
  exec = 0x7ffff56ef1e4 <php_yar_curl_multi_exec>, close = 0x7ffff56ef6af <php_yar_curl_multi_close>}

可以看到 add、exec、close 分别对应 php_yar_curl_multi_add_handle、php_yar_curl_multi_exec、php_yar_curl_multi_close 等函数,分别在这些函数上打断点,然后继续运行程序。

(gdb) b php_yar_curl_multi_add_handle
Breakpoint 3 at 0x7ffff56eea4e: file /home/vagrant/code/her-cat/yar/transports/curl.c, line 604.
(gdb) b php_yar_curl_multi_exec
Breakpoint 4 at 0x7ffff56ef1e4: file /home/vagrant/code/her-cat/yar/transports/curl.c, line 748.
(gdb) b php_yar_curl_multi_close
Breakpoint 5 at 0x7ffff56ef6af: file /home/vagrant/code/her-cat/yar/transports/curl.c, line 951.
(gdb) c
Continuing.

Breakpoint 3, php_yar_curl_multi_add_handle (self=0x7ffff5402428, handle=0x7ffff548e060)
    at /home/vagrant/code/her-cat/yar/transports/curl.c:604
warning: Source file is more recent than executable.
604    int php_yar_curl_multi_add_handle(yar_transport_multi_interface_t *self, yar_transport_interface_t *handle) /* {{{ */ {
(gdb)

php_yar_curl_multi_add_handle

php_yar_curl_multi_add_handle 函数的作用:将同步传输器实例以头插法保存到 chs 单链表中。

// source:transports/curl.c

int php_yar_curl_multi_add_handle(yar_transport_multi_interface_t *self, yar_transport_interface_t *handle) {
    yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
    yar_curl_data_t *data = (yar_curl_data_t *)handle->data;

    // 预处理,将传输数据保存到 curl 中
    php_yar_curl_prepare(handle);

    // 将同步传输器的 curl 实例添加到 curl 批处理实例中
    curl_multi_add_handle(multi->cm, data->cp);

    // 头插法保存
    if (multi->chs) {
        data->next = multi->chs;
        multi->chs = handle;
    } else {
        multi->chs = handle;
    }

    return 1;
}

php_yar_curl_multi_exec

php_yar_curl_multi_exec 函数的作用:通过 epoll 或 select 多路复用机制监控文件描述符的状态,并配合 curl_multi_* 系列函数对请求进行处理。

以下是主要逻辑:

  • 从 curl 批处理实例中取出所有请求的文件描述符。
  • 调用 select 函数监听文件描述符的状态,阻塞当前进程。
  • 进程从休眠状态恢复后,调用 curl_multi_perform 处理响应数据。
  • 调用 php_yar_curl_multi_parse_response 函数解析响应数据。
// source:transports/curl.c

int php_yar_curl_multi_exec(yar_transport_multi_interface_t *self, yar_concurrent_client_callback *f) /* {{{ */ {
    int running_count, rest_count;
    yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;

    // 尝试让 libcurl 处理数据,并获取当前正在运行中的数量
    while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));

    // 第一次调用回调函数,让调用方执行自己的逻辑。
    if (!f(NULL, YAR_ERR_OKEY, NULL)) {
        goto bailout;
    }

    if (running_count) {
        rest_count = running_count;
        do {
            int max_fd, return_code;
            struct timeval tv;
            fd_set readfds;
            fd_set writefds;
            fd_set exceptfds;

            FD_ZERO(&readfds);
            FD_ZERO(&writefds);
            FD_ZERO(&exceptfds);
            
            // 从 curl 批处理实例中取出所有请求的文件描述符
            curl_multi_fdset(multi->cm, &readfds, &writefds, &exceptfds, &max_fd);
            if (max_fd == -1) {
                // max_fd 为 -1 说明 libcurl 正在处理一些不能使用套接字监控的事情,
                // 所以需要等待一会再调用 curl_multi_perform
                long timeout;
                curl_multi_timeout(multi->cm, &timeout);
                if (timeout < 0) {
                    timeout = 50;
                }
                if (timeout) {
                    tv.tv_sec = timeout / 1000;
                    tv.tv_usec = (timeout % 1000) * 1000;
                    select(1, &readfds, &writefds, &exceptfds, &tv);
                }
                // 让 libcurl 处理数据
                while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
                goto process;
            }

            // 计算 select 超时时间
            tv.tv_sec = (zend_ulong)(YAR_G(timeout) / 1000);
            tv.tv_usec = (zend_ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) % 1000) * 1000 : 0);

            // 等待请求返回数据
            return_code = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);
            if (return_code > 0) {
                // 说明某些文件描述符可读/可写,让 libcurl 进行处理
                while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
            } else if (-1 == return_code) {
                // 发生异常
                php_error_docref(NULL, E_WARNING, "select error '%s'", strerror(errno));
                goto onerror;
            } else {
                // 等待超时
                php_error_docref(NULL, E_WARNING, "select timeout %ldms reached", YAR_G(timeout));
                goto onerror;
            }
process:
            if (rest_count > running_count) {
                // 解析响应的数据
                int ret = php_yar_curl_multi_parse_response(multi, f);
                if (ret == -1) {
                    goto bailout;
                } else if (ret == 0) {
                    goto onerror;
                }
                rest_count = running_count;
            }
        } while (running_count);
    } else {
        // 第一次尝试让 libcurl 处理数据后,
        // 如果运行中的数量为 0 ,说明已经全部处理完了,
        // 直接进行解析
        int ret = php_yar_curl_multi_parse_response(multi, f);
        if (ret == -1) {
            goto bailout;
        } else if (ret == 0) {
            goto onerror;
        }
    }

    return 1;
onerror:
    return 0;
bailout:
    self->close(self);
    zend_bailout();
    return 0;
}

php_yar_curl_multi_parse_response

在 php_yar_curl_multi_parse_response 函数上打断点,然后继续运行程序。

(gdb) b php_yar_curl_multi_parse_response
Breakpoint 6 at 0x7ffff56eeadb: file /home/vagrant/code/her-cat/yar/transports/curl.c, line 622.
(gdb) c
Continuing.

Breakpoint 6, php_yar_curl_multi_parse_response (multi=0x20b666f03b932500, f=0x16fb)
    at /home/vagrant/code/her-cat/yar/transports/curl.c:622
622    static int php_yar_curl_multi_parse_response(yar_curl_multi_data_t *multi, yar_concurrent_client_callback *f) /* {{{ */ {
(gdb) 

php_yar_curl_multi_parse_response 函数的主要逻辑:

  • 从 curl 批处理实例中取出所有收到的响应数据。
  • 遍历 multi->chs 找到数据对应的同步传输器。
  • 检查响应数据是否正确。
  • 解析响应数据,得到 Yar 协议中的头部及 payload 部分的内容。
  • 调用 php_yar_concurrent_client_callback 函数,执行对应的回调函数。
// source:transports/curl.c

static int php_yar_curl_multi_parse_response(yar_curl_multi_data_t *multi, yar_concurrent_client_callback *f) {
    int msg_in_sequence;
    CURLMsg *msg;

    do {
        // 从 curl 批处理实例中读取一条消息,并返回剩余消息数量(msg_in_sequence)
        msg = curl_multi_info_read(multi->cm, &msg_in_sequence);
        if (msg && msg->msg == CURLMSG_DONE) {
            // 标记是否找到了对应的同步传输器
            unsigned found = 0;
            yar_transport_interface_t *handle = multi->chs, *q = NULL;

            // 遍历单链表
            while (handle) {
                // curl 实例的指针地址相等,说明找到了
                if (msg->easy_handle == ((yar_curl_data_t*)handle->data)->cp) {
                    // 从单链表中移除
                    if (q) {
                        ((yar_curl_data_t *)q->data)->next = ((yar_curl_data_t*)handle->data)->next;
                    } else {
                        multi->chs = ((yar_curl_data_t*)handle->data)->next;
                    }
                    found = 1;
                    break;
                }
                q = handle;
                handle = ((yar_curl_data_t*)handle->data)->next;
            }

            if (found) {
                long http_code = 200;
                yar_response_t *response;
                yar_curl_data_t *data = (yar_curl_data_t *)handle->data;

                // 创建响应实例
                response = php_yar_response_instance();

                if (msg->data.result == CURLE_OK) {
                    curl_multi_remove_handle(multi->cm, data->cp);
                    
                    // 获取 HTTP 状态码
                    if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK && http_code != 200) {
                        // 非 200 说明出现异常
                        // 返回异常响应
                        continue;
                    } else {
                        // 状态码为 200 
                        if (data->buf.s) {
                            // ...
                            
                            // 从响应数据中解析出 Yar 协议的头部信息
                            if (!(header = php_yar_protocol_parse(payload))) {
                                php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
                            } else {
                                // 跳过 Yar 协议的头部信息后的内容,就是真正的响应结果
                                payload += sizeof(yar_header_t);
                                payload_len -= sizeof(yar_header_t);
                                
                                // 通过 payload 中的编码方式对 payload 进行解码
                                if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &ret))) {
                                    php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
                                } else {
                                    // 将解码后的数据存储到 response 中
                                    php_yar_response_map_retval(response, retval);
                                    DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'", response->id, 7, payload, header->body_len, payload + 8);
                                    zval_ptr_dtor(retval);
                                }
                                if (msg) {
                                    efree(msg);
                                }
                            }
                        } else {
                            // 响应内容为空,设置错误信息
                            php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
                        }

                        // 调用 php_yar_concurrent_client_callback 函数
                        if (!f(data->calldata, response->status, response)) {
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            return -1;
                        }
                        if (EG(exception)) {
                            handle->close(handle);
                            php_yar_response_destroy(response);
                            return 0;
                        }
                    }
                } else {
                    // 执行请求失败,返回失败原因
                    char *err = (char *)curl_easy_strerror(msg->data.result);
                    php_yar_response_set_error(response, YAR_ERR_TRANSPORT, err, strlen(err));
                    if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
                        handle->close(handle);
                        php_yar_response_destroy(response);
                        return -1;
                    }
                    if (EG(exception)) {
                        handle->close(handle);
                        php_yar_response_destroy(response);
                        return 0;
                    }
                }
                handle->close(handle);
                php_yar_response_destroy(response);
            } else {
                php_error_docref(NULL, E_WARNING, "unexpected transport info missed");
            }
        }
    } while (msg_in_sequence);

    return 1;
}

php_yar_concurrent_client_callback

该函数的作用:通过响应状态判断本次远程调用的结果,执行相应的回调函数。

主要逻辑:

  • 通过响应状态获取要执行的 PHP 回调函数。
  • 组装被调用函数的参数。
  • 执行 PHP 回调函数。
// source:yar_client.c

int php_yar_concurrent_client_callback(yar_call_data_t *calldata, int status, yar_response_t *response) /* {{{ */ {
    zval code, retval, retval_ptr;
    zval callinfo, *callback, func_params[3];
    zend_bool bailout = 0;
    unsigned params_count, i;

    if (calldata) {
        // 通过响应状态获取要执行的 PHP 回调函数
        if (status == YAR_ERR_OKEY) {
            if (!Z_ISUNDEF(calldata->callback)) {
                callback = &calldata->callback;
            } else {
                callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
            }
            params_count = 2;
        } else {
            if (!Z_ISUNDEF(calldata->ecallback)) {
                callback = &calldata->ecallback;
            } else {
                callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), 0);
            }
            params_count = 3;
        }

        // 没获取到回调函数就提示错误信息
        if (Z_ISNULL_P(callback)) {
            if (status != YAR_ERR_OKEY) {
                if (!Z_ISUNDEF(response->err)) {
                    php_yar_client_handle_error(0, response);
                } else {
                    php_error_docref(NULL, E_WARNING, "[%d]:unknown Error", status);
                }
            } else if (!Z_ISUNDEF(response->retval)) {
                zend_print_zval(&response->retval, 1);
            }
            return 1;
        }

        if (status == YAR_ERR_OKEY) {
            // 响应状态是成功但是返回值为空,提示错误信息。
            if (Z_ISUNDEF(response->retval)) {
                php_yar_client_trigger_error(0, YAR_ERR_REQUEST, "%s", "server responsed empty response");
                return 1;
            }
            // 复制返回值
            ZVAL_COPY(&retval, &response->retval);
        } else {
            // 复制状态及错误信息
            ZVAL_LONG(&code, status);
            ZVAL_COPY(&retval, &response->err);
        }

        // 初始化调用信息
        array_init(&callinfo);

        add_assoc_long_ex(&callinfo, "sequence", sizeof("sequence") - 1, calldata->sequence);
        add_assoc_str_ex(&callinfo, "uri", sizeof("uri") - 1, zend_string_copy(calldata->uri));
        add_assoc_str_ex(&callinfo, "method", sizeof("method") - 1, zend_string_copy(calldata->method));
    } else {
        // 调用数据为空则获取并行客户端的回调函数
        callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
        if (Z_ISNULL_P(callback)) {
            return 1;
        }
        params_count = 2;
    }

    if (calldata && (status != YAR_ERR_OKEY)) {
        // 调用失败
        ZVAL_COPY_VALUE(&func_params[0], &code);
        ZVAL_COPY_VALUE(&func_params[1], &retval);
        ZVAL_COPY_VALUE(&func_params[2], &callinfo);
    } else if (calldata) {
        // 调用成功
        ZVAL_COPY_VALUE(&func_params[0], &retval);
        ZVAL_COPY_VALUE(&func_params[1], &callinfo);
    } else {
        // 调用数据为空
        ZVAL_NULL(&func_params[0]);
        ZVAL_NULL(&func_params[1]);
    }

    zend_try {
        // 执行回调函数
        if (call_user_function(EG(function_table), NULL, callback, &retval_ptr, params_count, func_params) != SUCCESS) {
            for (i = 0; i < params_count; i++) {
                zval_ptr_dtor(&func_params[i]);    
            }
            if (calldata) {
                php_error_docref(NULL, E_WARNING, "call to callback failed for request: '%s'", ZSTR_VAL(calldata->method));
            } else {
                php_error_docref(NULL, E_WARNING, "call to initial callback failed");
            }
            return 1;
        }
    } zend_catch {
        bailout = 1;
    } zend_end_try();

    // 释放返回值及参数
    if (!Z_ISUNDEF(retval_ptr)) {
        zval_ptr_dtor(&retval_ptr);
    }

    for (i = 0; i < params_count; i++) {
        zval_ptr_dtor(&func_params[i]);    
    }
    return bailout? 0 : 1;
}

打印 retval 可以看到服务端的返回值:success。

(gdb) p *retval.value.str.val@7
$28 = "success"

总结

本文介绍了实现并行调用相关的数据结构,并通过 GDB 调试代码的方式展示了并行调用运行的过程。

本文作者:她和她的猫
本文地址https://her-cat.com/posts/2022/01/14/yar-internals-client-concurrent-call/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!