官方文档websocket基于http_server,如果我们起了websocket脚本 同样也是拥有http_server的特性.首先看下文档内容.

  • WebSocket\Server 继承自 Http\Server
  • 设置了onRequest回调,WebSocket\Server也可以同时作为http服务器
  • 未设置onRequest回调,WebSocket\Server收到http请求后会返回http 400错误页面
  • 如果想通过接收http触发所有websocket的推送,需要注意作用域的问题,面向过程请使用globalWebSocket\Server进行引用,面向对象可以把WebSocket\Server设置成一个成员属性.

创建websock

  在server目录下创建websocket.php,可以直接复制server/http.php的内容,毕竟websocket基于http嘛.注意websocket还需要两个回调openmessage.

class Ws
{
    CONST HOST = '0.0.0.0';
    CONST PORT = 9503;
    CONST CHAT_PORT = 9504;
    CONST LIVE_PORT = 9505;
    public $websocket = null;
    public $document_root = "/Users/qvbilam/Sites/liveTelecast/public/static";
    public $set_redis_key = '';

    public function __construct()
    {
        $this->websocket = new \Swoole\WebSocket\Server(self::HOST, self::PORT);
        $this->websocket->listen(self::HOST, self::CHAT_PORT, SWOOLE_SOCK_TCP);
        $this->websocket->listen(self::HOST, self::LIVE_PORT, SWOOLE_SOCK_TCP);
        $this->websocket->set([
            'enable_static_handler' => true,
            'document_root' => $this->document_root,
            'worker_num' => 4,
            'task_worker_num' => 4
        ]);
        $this->websocket->on('open', [$this, 'onOpen']);
        $this->websocket->on('message', [$this, 'onMessage']);
        $this->websocket->on('workerstart', [$this, 'onWorkerStart']);
        $this->websocket->on('request', [$this, 'onRequest']);
        $this->websocket->on('task', [$this, 'onTask']);
        $this->websocket->on('finish', [$this, 'onFinish']);
        $this->websocket->on('close', [$this, 'onclose']);
        $this->websocket->start();

    }

    /*
     * */
    public function onWorkerStart($server, $worker_id)
    {
        // 定义应用目录 index.php
        define('APP_PATH', __DIR__ . '/../application/');
        // 再去加载php的引导文件  不直接复制index.php的文件中的引入start.php.是因为在start.php中还有执行应用我们不需要。所以直接引入base.php就行
        // x加载基础文件
        // require __DIR__ . '/../thinkphp/base.php';
        // 直接引入这个才可以是用tp的功能。要不然回找不到下面app的类。只要让index/index/index人 turn空就行
        require __DIR__ . '/../thinkphp/start.php';

        $this->set_redis_key = \think\Config::get('redis.live_game_key');
        /*判断有没有链接用户。如果有全部清空*/
        $smembers = \app\command\Predis::getIntance()->sMembers($this->set_redis_key);
        $srem = implode(' ', $smembers);
        \app\command\Predis::getIntance()->sRem($this->set_redis_key, $srem);
    }

    /*request 回调*/
    public function onRequest($request, $response)
    {
        /*
         * 处理请求的一种方法哟~
        if($request->server['request_uri'] == '/xxx'){
            // 返回404 并结束。如果不用end() 会报500。这个请求就协程自己的吧
            $response->status(404);
            $response->end();
            return ;
        }*/

        $_SERVER = [];
        if (isset($request->server)) {
            foreach ($request->server as $key => $val) {
                $_SERVER[strtoupper($key)] = $val;
            }
        }
        if (isset($request->header)) {
            foreach ($request->header as $key => $val) {
                $_SERVER[strtoupper($key)] = $val;
            }
        }
        $_GET = [];
        if (isset($request->get)) {
            foreach ($request->get as $key => $val) {
                $_GET[$key] = $val;
            }
        }
        $_POST = [];
        if (isset($request->post)) {
            foreach ($request->post as $key => $val) {
                $_POST[$key] = $val;
            }
        }

        $_FILES = [];
        if (isset($request->files)) {
            foreach ($request->files as $key => $val) {
                $_FILES[$key] = $val;
            }
        }
        /*这样在别的地方就可以全局时候httpserver的东西了*/
        $_POST['http_server'] = $this->websocket;

        // 执行应用
        ob_start();
        try {
            think\App::run()->send();
        } catch (\Exception $e) {
            /*可以输出一些错误。打错误日志什么的。根据自己业务吧*/
        }
        $rst = ob_get_contents();
        ob_end_clean();
        $response->end($rst);
    }

    public function onTask($server, $taskId, $workerId, $data)
    {
        /*分发task任务 让不同任务走不同逻辑*/
        $obj = new app\command\Task();
        $method = $data['method'];
        $flag = $obj->$method($data['data']);
        return $flag;
    }

    public function onFinish($server, $taskId, $data)
    {
        echo "finish-data-success:{$data}\n";
    }

    public function onOpen($ws, $frame)
    {
        echo $frame->fd;
    }

    public function onMessage($ws, $frame)
    {
        $ws->push($frame->fd, 'push内容');
    }

    public function onClose($ws, $fd)
    {
        /*删除断开链接的用户*/
        \app\command\Predis::getIntance()->srem($this->set_redis_key, $fd);
        echo $fd . '/close' . PHP_EOL;
    }
}

new Ws();

图片上传

前端逻辑

  对于websocket的测试和使用就用我做的图文直播来演示吧.

// 在websocket.php设置的静态资源目录
public $document_root = "/Users/qvbilam/Sites/liveTelecast/public/static";
// 所以我们将所有的静态资源都放到static目录下

创建public/static/admin/live.html用于做图片上传.

<header>
    ...
    <link rel="stylesheet" type="text/css" href="../webuploader/webuploader.css">
    <script type="text/javascript" src="../webuploader/webuploader.js"></script>
</header>

<body>
    <div class="layui-form-item layui-form-text">
        <label for="desc" class="layui-form-label">
            赛况图
        </label>
        <!--dom结构部分-->
        <div id="uploader-demo">
            <!--用来存放item-->
            <div id="fileList" class="uploader-list"></div>
            <div id="filePicker">选择图片</div>
        </div>
    </div>
</body>

<script>
    //获取当前的请求地址
    var host = window.location.host;
    //获取当前协议
    var agreement = window.location.protocol;

    var send_url = agreement + '//' + host + '/admin/live/data'
    var getData = window.location.search
    var getData = getData.substr(1) //去掉?asd=123前面的问好


    /*获取直播的数据*/
    $.ajax({
        type: "get",
        url: send_url,
        data: getData,
        success: function (data) {
            data = JSON.parse(data)
            if (data.msg == 'ok') {
                /*todo*/
                html = '<option value="0">请选择</option>'
                html += '<option value="' + data.data.a_id + '">' + data.data.a_name + '</option>'
                html += '<option value="' + data.data.b_id + '">' + data.data.b_name + '</option>'
                $('#team_id').append(html);

            } else {
                alert(data.data)
            }
        }
    });
  /*图片上传*/
    var $list = $("#fileList")


    ratio = window.devicePixelRatio || 1
    thumbnailWidth = 100 * ratio
    thumbnailHeight = 100 * ratio


    // 初始化Web Uploader
    var uploader = WebUploader.create({

        // 选完文件后,是否自动上传。
        auto: true,

        // swf文件路径
        swf: agreement + '//' + host + '/webuploader/Uploader.swf',

        // 文件接收服务端。
        server: agreement + '//' + host + '/admin/image/index',

        // 选择文件的按钮。可选。
        // 内部根据当前运行是创建,可能是input元素,也可能是flash.
        pick: '#filePicker',

        // 只允许选择图片文件。
        accept: {
            title: 'Images',
            extensions: 'gif,jpg,jpeg,bmp,png',
            mimeTypes: 'image/*'
        }
    });

    // 当有文件添加进来的时候
    uploader.on('fileQueued', function (file) {
        var $li = $(
            '<div id="' + file.id + '" class="file-item thumbnail">' +
            '<img>' +
            '</div>'
            ),
            $img = $li.find('img');


        // $list为容器jQuery实例
        $list.append($li);

        // 创建缩略图
        // 如果为非图片文件,可以不用调用此方法。
        // thumbnailWidth x thumbnailHeight 为 100 x 100
        uploader.makeThumb(file, function (error, src) {
            if (error) {
                $img.replaceWith('<span>不能预览</span>');
                return;
            }

            $img.attr('src', src);
        }, thumbnailWidth, thumbnailHeight);
    });

    // 文件上传过程中创建进度条实时显示。
    uploader.on('uploadProgress', function (file, percentage) {
        var $li = $('#' + file.id),
            $percent = $li.find('.progress span');

        // 避免重复创建
        if (!$percent.length) {
            $percent = $('<p class="progress"><span></span></p>')
                .appendTo($li)
                .find('span');
        }

        $percent.css('width', percentage * 100 + '%');
    });

    // 文件上传成功,给item添加成功class, 用样式标记上传成功。
    uploader.on('uploadSuccess', function (file, response) {
        if (response.code == 0) {
            console.log(response.data.image)
            $('#' + file.id).append("<input type='hidden' name='image' value='" + response.data.image + "' />")
        }
        $('#' + file.id).addClass('upload-state-done');
    });

    // 文件上传失败,显示上传出错。
    uploader.on('uploadError', function (file) {
        var $li = $('#' + file.id),
            $error = $li.find('div.error');

        // 避免重复创建
        if (!$error.length) {
            $error = $('<div class="error"></div>').appendTo($li);
        }

        $error.text('上传失败');
    });

    // 完成上传完了,成功或者失败,先删除进度条。
    uploader.on('uploadComplete', function (file) {
        $('#' + file.id).find('.progress').remove();
    });


    var $submitBtn = $('#submit-btn');
    // 提交表单
    $submitBtn.click(function (event) {
        event.preventDefault();
        var formData = $('form').serialize();
        // TODO: 请求后台接口跳转界面,前端跳转或者后台跳
        //  ?fight_id=1&type=1&team_id=0&content=
        // console.log(agreement + '//' + host + "/admin/live/push?" + getData + "&" + formData)
        console.log($('#fileList').html());
        $.get(agreement + '//' + host + "/admin/live/push?" + getData + "&" + formData, function (data) {

            if (data.status == 1) {
                // 登录成功
            }
            // location.href='index.html';
        }, 'json');
    });
</script>

  可以看到在live.html中涉及到了服务端的上传server: agreement + '//' + host + '/admin/image/index',.所以现在需要对上传的图片进行处理~

后端逻辑

  创建application/admin/controller/Image.php

namespace app\admin\controller;

class Image
{
    // 图片上传
    public function index()
    {
       print_r($_FILES);
    }
}

-- 如果打印出来空的,可能是在websocket中没有设置全局变量.

# 解决在server/websocket.php的onRequest添加如下
$_FILES = [];
if (isset($request->files)) {
    foreach ($request->files as $key => $val) {
        $_FILES[$key] = $val;
    }
}

# 返回结果
[file] => Array
(
    [name] => fate_emiya.jpeg
    [type] => image/jpeg
    [tmp_name] => /tmp/swoole
    [error] => 0
    [size] => 129625
)

  修改服务端代码application/admin/controller/Image.php

namespace app\admin\controller;

use app\command\Util;
use think\Config;
use \Upyun\Upyun;
use \Upyun\Config as UConfig;

class Image
{

    public function index()
    {
        $file = $_FILES['file'];
        /*获取最后一个小数点后的所有字符串*/
        $type = strrchr($file['name'], '.');
        $name = self::str_rand(5);
        $res = move_uploaded_file($file['tmp_name'], '../public/static/upload/' . $name . $type);
        if ($res) {
            return Util::show(Config::get('code.success'), 'ok', [
                'image' => Config::get('web.image_host') . '/upload/' . $name . $type]);
        } else {
            return Util::show(Config::get('code.error_upload_image'), 'error');
        }
    }

    /*返回m长度的字符串*/
    static public function str_rand($m)
    {
        $new_str = '';
        $str = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwsyz0123456789';
        for ($i = 1; $i <= $m; ++$i) {
            $new_str .= $str[mt_rand(0, 61)];
        }
        return $new_str;
    }
}

消息推送

  把websocket服务赋值给了$_POST['http_server']可以让websocket的方法全局使用,这里直接调用push推送给客户端做链接测试.

// $_POST['http_server']设置
public function onRequest($request, $response)
{
    ...
    $_POST['http_server'] = $this->websocket;  
    ...
}

  创建application/admin/controller/Live.php用于向用户消息推送

namespace app\admin\controller;

class Live
{
    /* 直播赛况表单上传 */
    public function push()
    {
        // 当后端发送数据后推送给前端用户
        // 给链接id为2的用户推送消息
        $_POST['http_server']->push(2,'我推送消息了哦');
    }  
}

  在detail页面引入链接websocketjs文件

var WsUrl = "ws://39.97.177.28:9505"
var websocketLive = new WebSocket(WsUrl);

//链接websock服务
websocketLive.onopen = function (evt) {
    console.log("live connet success")
    //向服务端发送消息hhhh
    // websocket.send('hhhhhh');
}

//接受服务端消息
websocketLive.onmessage = function (evt) {
    //获取服务端传来的数据push
    messagePush(evt.data)
}

websocketLive.onclose = function (evt) {
    console.log("cloes")
}

websocketLive.onerror = function (evt, e) {
    console.log(evt.data)
}

  然后将内容发送请求到admin/live/push中就可以在页面的Console看到推送的消息了哦~

用户处理

第一种方式

  官网给出的方案.官方文档位置server—>函数列表—>Server::$connections.感兴趣可以自己试试,我就不耍了~

foreach($server->connections as $fd)
{
    $server->send($fd, "hello");
}

echo "当前服务器共有 ".count($server->connections). " 个连接\n";

第二种方式

  将用户存储到redis的有序集合中,中app\command\Predis.php

-- 第一种写法
/*添加有序合集*/
public function sadd($key,$value)
{
    return $this->redis->sadd($key,$value);
}

-- 第二种写法,推荐
  
public function __call($name,$arguments)
{
  return $this->redis->$name(...$arguments);
}

  在server/websocket.php服务中添加监听用户链接方式中存放fd.

public function __construct()
{
    ...
    $this->websocket->on('open', [$this, 'onOpen']);
    $this->websocket->on('message', [$this, 'onMessage']);  
    ...
}

public function onOpen($ws, $frame)
{
    // 将用户存放到redis
    \app\command\Predis::getIntance()->sadd($this->set_redis_key, $frame->fd);
    echo $frame->fd;
}

public function onMessage($ws, $frame)
{
    $ws->push($frame->fd, 'push内容');
}

public function onClose($ws, $fd)
{
    /*删除断开链接的用户*/
    \app\command\Predis::getIntance()->srem($this->set_redis_key, $fd);
    echo $fd . '/close' . PHP_EOL;
}

  链接测试

127.0.0.1:6379> smembers live_game_key
1) "3"
127.0.0.1:6379> smembers live_game_key
1) "3"
2) "6"

  测试成功,但是还有一个小小的问题.重启服务后,之前的链接数还是会在有序集合中,问题也不是很大.每次启动服务判断一下是否还有链接在有序集合中.在server/websocket.php添加下面这段代码

public function onWorkerStart($server, $worker_id)
{
    ...
    /*判断有没有链接用户。如果有全部清空*/
    $smembers = \app\command\Predis::getIntance()->sMembers($this->set_redis_key);
    $srem = implode(' ', $smembers);
    \app\command\Predis::getIntance()->sRem($this->set_redis_key, $srem);
}

  在调用thinkphp框架的内容后再使用命名空间找类和方法,刚开始我写在__construct死活都获取不到这个key一定要注意哈!之前也说过的.

Last modification:February 18th, 2020 at 10:20 pm