通过 Swoole\Table 实现 Swoole 多进程数据共享

  • 2022-07-10 21:22:19

第三方存储媒介

前面我们介绍了基于 swoole 的 process 及 process\pool 模块在 php 中实现多进程管理,但是多进程模式下进程间是相互隔离的,无法共享数据和变量,即便是通过 global 定义的全局或超全局变量,也只是在所属进程中有效,如果要在 swoole 实现的多进程间共享数据,需要借助第三方存储媒介实现:

  • 数据库:mysql、mongodb
  • 缓存:redis、memcached
  • 磁盘文件

但是这也会引入新的问题,多进程同时操作一条记录或一个文件存在并发访问问题,以数据库操作为例,两个进程可能会同时读取一条数据,或者一个进程对某条记录进行更新处理时,另一个进程也来读取这条记录并进行操作,会导致最终结果数据与预期不一致的情况,这个时候,我们就需要引入锁的概念,当一个进程(比如进程a)对某个记录进行写操作时,对该记录加锁,这样其它进程就无法操作该条记录, 直到进程 a 事务提交再释放这个锁,让其他进程可以进行操作。

内存共享

php 相关扩展

对于单机操作来说,除了这些第三方存储媒介之外,还可以通过共享内存的方式实现进程间数据读写操作,有多个 php 扩展可以支持共享内存数据操作:

  • semaphore 扩展:可通过该扩展包提供的 shm_get_var 和 shm_put_var 函数实现内存共享数据的读写操作;
  • shmop 扩展:可通过该扩展包提供的 shmop_read 和 shmop_write 函数实现内存共享数据的读写操作;
  • apcu(apc user cache)扩展:可通过该扩展包提供的 apc_fetch 和 apc_store 实现内存共享数据的读写操作。

swoole table

但是上述扩展要么不支持锁,要么高并发时性能比较差,所以 swoole 自己实现了一个共享内存读写工具 —— swoole\table,该工具是一个基于共享内存和锁实现的高性能并发数据结构,可用于解决多进程/多线程数据共享和同步加锁问题:

  • 性能强悍,单线程每秒可读写200万次;
  • 应用代码无需加锁,内置行锁自旋锁,所有操作均是多线程/多进程安全,用户层完全不需要考虑数据同步问题;
  • 支持多进程,可用于多进程之间共享数据;
  • 使用行锁,而不是全局锁,仅当 2 个进程在同一 cpu 时间,并发读取同一条数据才会进行发生抢锁。

swoole\table 支持以 key-value 方式读写,使用起来非常简单:

<?php

// 初始化一个容量为 1024 的 swoole table
$table = new \swoole\table(1024);
// 在 table 中新增 id 列
$table->column('id', \swoole\table::type_int);
// 在 table 中新增 name 列,长度为 50
$table->column('name', \swoole\table::type_string, 10);
// 在 table 中新泽 score 列
$table->column('score', \swoole\table::type_float);
// 创建这个 swoole table
$table->create();


// 设置 key-value 值
$table->set('student-1', ['id' => 1, 'name' => '学小君', 'score' => 80]);
$table->set('student-2', ['id' => 2, 'name' => '学院君', 'score' => 90]);

// 如果指定 key 值存在则打印对应 value 值
if ($table->exist('student-1')) {
    echo "student-" . $table->get('student-1', 'id') . ':' . $table->get('student-1', 'name').":".
        $table->get('student-1', 'score') . "\n";
}

// 自增操作
$table->incr('student-2', 'score', 5);
// 自减操作
$table->decr('student-2', 'score', 5);

// 表中总记录数
$count = $table->count();

// 删除指定表记录
$table->del('student-1');

  

此外 swoole\table 类还实现了迭代器接口,支持通过 foreach 进行遍历。

在 laravel 中使用 swoole\table

如果要在 laravel 中集成 swoole 使用 swoole\table,以 laravels 扩展包为例,首先要在配置文件 config/laravels.php 中定义 swoole_tables 配置项:

'swoole_tables'            => [
    'ws' => [ // 表名,会加上 table 后缀,比如这里是 wstable
        'size'   => 102400, //  表容量
        'column' => [ // 表字段,字段名为 value
            ['name' => 'value', 'type' => \swoole\table::type_int, 'size' => 8],
        ],
    ],
    ... // 还可以定义其它表
],

  

然后我们可以在代码中通过swoole实例上的wstable属性访问 swooletable:

class websocketservice implements websockethandlerinterface
{
    ...

    // 连接建立时触发
    public function onopen(server $server, request $request)
    {
        // 在触发 websocket 连接建立事件之前,laravel 应用初始化的生命周期已经结束,你可以在这里获取 laravel 请求和会话数据
        // 调用 push 方法向客户端推送数据,fd 是客户端连接标识字段
        log::info('websocket 连接建立:' . $request->fd);
        app('swoole')->wstable->set('fd:' . $request->fd, ['value' => $request->fd]);
        $server->push($request->fd, 'welcome to websocket server built on laravels');
    }

    // 收到消息时触发
    public function onmessage(server $server, frame $frame)
    {
        foreach (app('swoole')->wstable as $key => $row) {
            if (strpos($key, 'fd:') === 0 && $server->exist($row['value'])) {
                log::info('receive message from client: ' . $row['value']);
                // 调用 push 方法向客户端推送数据
                $server->push($frame->fd, 'this is a message sent from websocket server at ' . date('y-m-d h:i:s'));
            }
        }
    }
    
    ...

}

  

然后我们参考在 laravel 中集成 swoole 实现 websocket 服务器这篇教程从客户端向 websocket 服务器发起请求,即可在最新日志文件中看到相应的日志信息:

[2020-04-24 19:39:03] local.info: websocket 连接建立:1  
[2020-04-24 19:39:07] local.info: receive message from client: 1

  



猜你喜欢