| 状态 | 注释 |
|---|---|
| delayed | 延迟状态 |
| ready | 准备好状态 |
| reserved | 消费者把任务读出来,处理时 |
| buried | 预留状态 |
| delete | 删除状态 |
管理工具
亲测了很多网上能找到的 beanstalkd 工具,这两款是我最中意的了,一个命令行,一个 web 的。
命令行:https://github.com/src-d/beanstool
web 界面:https://github.com/ptrofimov/beanstalk_console
编程语言客户端
PHP 客户端
https://packagist.org/packages/pda/pheanstalk
composer require pda/pheanstalk
写入 job
?php
//创建队列消息
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
$jobData = [
'email' => '123456@163.com',
'message' => 'Hello World !!',
'dtime' => date('Y-m-d H:i:s'),
];
$pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );
消费 job
?php
ini_set('default_socket_timeout', 86400*7);
ini_set( 'memory_limit', '256M' );
// 消费队列消息
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
while ( true )
{
// 获取队列信息, reserve 阻塞获取
$job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve();
if ( $job !== false )
{
$data = $job->getData();
/* TODO 逻辑操作 */
/* 处理完成,删除 job */
$pheanstalk->delete( $job );
}
}
default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。
就是这样的代码,供参考:
public function watchJob()
{
$job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve();
if ( $job !== false )
{
$job_data = $job->getData();
$this->subscribe( $job_data );
$this->pheanstalk->delete( $job );
/* 继续 Watch 下一个 job */
$this->watchJob();
}
else
{
$this->log->error( 'reserve false', 'reserve false' );
}
}
监控 beanstalkd 状态
?php
//监控服务状态
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$isAlive = $pheanstalk->getConnection()->isServiceListening();
var_dump( $isAlive );
可以配合 email 做一个报警邮件,脚本每分钟去执行,判断状态是 false,就给管理员发送邮件报警。
一些相关命令
查看 beanstalkd 服务内存占用
top -u beanstalkd
后台运行 consumer 脚本
nohup php googlehome_subscribe.php
查看 consumer 脚本运行时间
ps -A -opid,stime,etime,args | grep consumer.php
手工重启 consumer 脚本
ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9
nohup php googlehome_subscribe.php
一些总结
php 要把错误日志打开,方便收集 consumer 脚本 crash 的 log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。
一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考。
try
{
/* TODO: 逻辑操作 */
}
catch ( ClientException $e )
{
$results['mid'] = $this->mid;
$results['code'] = $e->getResponse()->getStatusCode();
$results['reason'] = $e->getResponse()->getReasonPhrase();
$this->log->error( 'properties-changed ClientException', $results );
}
catch ( ServerException $e )
{
$results['mid'] = $this->mid;
$results['code'] = $e->getResponse()->getStatusCode();
$results['reason'] = $e->getResponse()->getReasonPhrase();
$this->log->error( 'properties-changed ServerException', $results );
}
catch ( ConnectException $e )
{
$results['mid'] = $this->mid;
$this->log->error( 'properties-changed ConnectException', $results );
}
job 消费之后一定要删除掉,如果长时间不删除,php 客户端会有 false 返回,是因为有 DEADLINE_SOON 这个超时错误产生,所以处理完任务,一定要记得删除,这一点跟 kafka 不一样,beanstalkd 需要开发者自己去删除 job。
以上就是PHP7生产环境队列Beanstalkd用法详解的详细内容,更多关于PHP7中Beanstalkd正确用法的资料请关注脚本之家其它相关文章!