说明:
这是基于swoole_process封装的一个并发执行类,代码实现您可以做下参考!
个人在项目上用还是有挺好用的,使用时需要注意的是如果您使用的不是laravels框架,那么要注意以下两个地址根据您的框架做下调整:
- DB::disconnect(); 看您的框架使用哪个方法,意思是每创建一个进程需要断开重连一个mysql否则可能会造成报错,当然如果你的并发中不牵扯到sql查询可以注释掉这个
- Cache:: 缓存支持,由于通道支持处理的字符有限,我们通过缓存来实现数据的交换,根据您的框架支持或干脆使用redis实现数据交换即可,这里您可以自行改造。
好了我们直接上代码:
/**
* 并发请求类 - 不支持并发将自动使用串行
* @return object {"getKey":"获取key",“getCount”:"获取并发数","add":"添加并发任务","run":"开始运行任务并返回结果","onError":"监听错误信息","onSuccess":"监听成功信息"}
* @example $P = cdpProcess();
* $P->add(
* function(){
* return 任务结果
* },
* "[可选]自定义id" //返回任务结果键名将为此值
* );
* $data = $P->run(); //开始任务
*/
function cdpProcess()
{
return new class {
private $uuid;
private $installed = 0;
private $worker = [];
private $errCall = [];
private $dataCall = [];
private $index = 0;
public function __construct()
{
if (function_exists("swoole_version")) {
$this->installed = 1;
}
$this->uuid = uniqid(mt_rand(1, 100));
}
public function getKey(): string
{
return $this->uuid;
}
public function getCount(): int
{
return count($this->worker);
}
/**
* 添加执行方法进入队列
* @param callable $callback 任务逻辑回调
* @param int|string $index_name 可选任务自定义id否则
* @return $this
*/
public function Add(callable $callback, $index_name = null)
{
$key = $this->getKey()."_".$this->index;
if ($this->installed) {
$pro = new \swoole_process(function (\swoole_process $work) use ($callback, $index_name, $key) {
try {
DB::disconnect(); //始终使用新连接规避并发冲突
$d = $callback($index_name);
Cache::put(
$index = 'Process:' . $key,
json_encode($d, 320),
5
);
$work->write('{"key":"' . $index . '","index_name":"' . $index_name . '"}');
} catch (\Exception $e) {
$work->write(json_encode(['err' => $e->getMessage()]));
}
$work->exit(0);
}, true);
$this->worker[] = $pro;
} else {
$this->worker[] = new class($callback, $index_name) {
private $callback;
private $index_name;
public function __construct(callable $callback, $index_name = null)
{
$this->callback = $callback;
$this->index_name = $index_name;
}
public function start()
{
try {
$d = ($this->callback)($this->index_name);
} catch (\Exception $e) {
$d = ['err' => $e->getMessage()];
}
return [$d, $this->index_name];
}
};
}
$this->index++;
return $this;
}
/**
* 错误回调
* @param callable $errCallBack
* @return $this
*/
public function onError(callable $errCallBack)
{
$this->errCall[] = $errCallBack;
return $this;
}
/**
* 完成数据回调
* @param callable $dataCallBack
* @return $this
*/
public function onSuccess(callable $dataCallBack)
{
$this->dataCall[] = $dataCallBack;
return $this;
}
/**
* 启动并返回结果
* @return array
*/
public function run(): array
{
$data = $err = [];
foreach ($this->worker as $i => $pro) {
$s = $pro->start();
if (!$this->installed) {
empty($s[1]) || $i = $s[1];
if (!empty($s[0]['err']))
$err[$i] = $s[0]['err'];
else
$data[$i] = $s[0];
}
}
if ($this->installed) {
foreach ($this->worker as $i => $pro) {
if (!empty($d = $pro->read())) {
$d = json_decode($d, 1);
empty($d["index_name"]) || $i = $d["index_name"];
if (!empty($d['err']))
$err[$i] = $d['err'];
else {
$data[$i] = json_decode(Cache::pull($d['key'], null), 1);
}
}
unset($this->worker[$i]);
}
while (\swoole_process::wait());
}
empty($err) && $this->ErrCallback($err);
$this->DataCallback($data);
return $data;
}
private function ErrCallback(array $err)
{
foreach ($this->errCall as $fun) {
try {
$fun($err);
} catch (\Exception $e) {
}
}
}
private function DataCallback(array $data)
{
foreach ($this->dataCall as $fun) {
try {
$fun($data);
} catch (\Exception $e) {
}
}
}
};
}
评论 (0)