php swoole的并发查询封装

Tony哥
2023-10-13 / 0 评论 / 233 阅读 / 正在检测是否收录...

说明:

这是基于swoole_process封装的一个并发执行类,代码实现您可以做下参考!
个人在项目上用还是有挺好用的,使用时需要注意的是如果您使用的不是laravels框架,那么要注意以下两个地址根据您的框架做下调整:

  1. DB::disconnect(); 看您的框架使用哪个方法,意思是每创建一个进程需要断开重连一个mysql否则可能会造成报错,当然如果你的并发中不牵扯到sql查询可以注释掉这个
  2. Cache:: 缓存支持,由于通道支持处理的字符有限,我们通过缓存来实现数据的交换,根据您的框架支持或干脆使用redis实现数据交换即可,这里您可以自行改造。

好了我们直接上代码:

    /**
     * 并发请求类 - 不支持并发将自动使用串行
     * @return object {"getKey":"获取key",“getCount”:"获取并发数","add":"添加并发任务","run":"开始运行任务并返回结果","onError":"监听错误信息","onSuccess":"监听成功信息"}
     * @example $P = cdpProcess();
     *          $P->add(
     *              function(){
     *                  return 任务结果
     *              },
     *              "[可选]自定义id" //返回任务结果键名将为此值
     *          );
     *          $data = $P->run(); //开始任务
     */
    function cdpProcess()
    {
        return new class {
            private $uuid;
            private $installed = 0;
            private $worker = [];
            private $errCall = [];
            private $dataCall = [];
            private $index = 0;

            public function __construct()
            {
                if (function_exists("swoole_version")) {
                    $this->installed = 1;
                }
                $this->uuid = uniqid(mt_rand(1, 100));
            }

            public function getKey(): string
            {
                return $this->uuid;
            }

            public function getCount(): int
            {
                return count($this->worker);
            }

            /**
             * 添加执行方法进入队列
             * @param callable $callback 任务逻辑回调
             * @param int|string $index_name 可选任务自定义id否则
             * @return $this
             */
            public function Add(callable $callback, $index_name = null)
            {
                $key = $this->getKey()."_".$this->index;
                if ($this->installed) {
                    $pro = new \swoole_process(function (\swoole_process $work) use ($callback, $index_name, $key) {
                        try {
                            DB::disconnect(); //始终使用新连接规避并发冲突
                            $d = $callback($index_name);
                            Cache::put(
                                $index = 'Process:' . $key,
                                json_encode($d, 320),
                                5
                            );
                            $work->write('{"key":"' . $index . '","index_name":"' . $index_name . '"}');
                        } catch (\Exception $e) {
                            $work->write(json_encode(['err' => $e->getMessage()]));
                        }
                        $work->exit(0);
                    }, true);
                    $this->worker[] = $pro;
                } else {
                    $this->worker[] = new class($callback, $index_name) {
                        private $callback;
                        private $index_name;

                        public function __construct(callable $callback, $index_name = null)
                        {
                            $this->callback = $callback;
                            $this->index_name = $index_name;
                        }

                        public function start()
                        {
                            try {
                                $d = ($this->callback)($this->index_name);
                            } catch (\Exception $e) {
                                $d = ['err' => $e->getMessage()];
                            }
                            return [$d, $this->index_name];
                        }
                    };
                }
                $this->index++;
                return $this;
            }

            /**
             * 错误回调
             * @param callable $errCallBack
             * @return $this
             */
            public function onError(callable $errCallBack)
            {
                $this->errCall[] = $errCallBack;
                return $this;
            }

            /**
             * 完成数据回调
             * @param callable $dataCallBack
             * @return $this
             */
            public function onSuccess(callable $dataCallBack)
            {
                $this->dataCall[] = $dataCallBack;
                return $this;
            }

            /**
             * 启动并返回结果
             * @return array
             */
            public function run(): array
            {
                $data = $err = [];
                foreach ($this->worker as $i => $pro) {
                    $s = $pro->start();
                    if (!$this->installed) {
                        empty($s[1]) || $i = $s[1];
                        if (!empty($s[0]['err']))
                            $err[$i] = $s[0]['err'];
                        else
                            $data[$i] = $s[0];
                    }
                }
                if ($this->installed) {
                    foreach ($this->worker as $i => $pro) {
                        if (!empty($d = $pro->read())) {
                            $d = json_decode($d, 1);
                            empty($d["index_name"]) || $i = $d["index_name"];
                            if (!empty($d['err']))
                                $err[$i] = $d['err'];
                            else {
                                $data[$i] = json_decode(Cache::pull($d['key'], null), 1);
                            }
                        }
                        unset($this->worker[$i]);
                    }
                    while (\swoole_process::wait());
                }
                empty($err) && $this->ErrCallback($err);
                $this->DataCallback($data);
                return $data;
            }


            private function ErrCallback(array $err)
            {
                foreach ($this->errCall as $fun) {
                    try {
                        $fun($err);
                    } catch (\Exception $e) {
                    }
                }
            }

            private function DataCallback(array $data)
            {
                foreach ($this->dataCall as $fun) {
                    try {
                        $fun($data);
                    } catch (\Exception $e) {
                    }
                }
            }
        };
    }
0

评论 (0)

取消