本文共 3776 字,大约阅读时间需要 12 分钟。
beanstalk 消息队列 小结 协议说明和各状态转换情况 基本知识点: 1. 对于beanstalk 消息队列中每条数据都为 job 2. beanstalk service端 ,会维护 tubes[多个管道] 3. client端可以监听,使用多 tube 4. client端可以指定 use 管道[ client生成一个新的job时会把此job提交到 指定管道] 5. client端可以指定 watch 管道 [ client接收处理job时会到 指定管道得到待处理的job] 官方示意图: put reserve delete -----> [READY] ---------> [RESERVED] --------> *poof* 一般情况: 1. 任务提交到service端,job 管理放入内存空间并为其标记状态 [READY] 2. client通过轮训竞争得到次状态, job 改为 [RESERVED] 2.1 当在默认时间 120 秒内没处理完 , job.stats.timeouts 就会大于 0 同时其他 轮训竞争client会拿到这个job【 注意了 每次timeouts时,在轮训的客户端就会得到次job,状态都为 ready,timeouts>0 】 3. 随便其中一台client处理完 job.delete , 其他 client 中的此job 都会 *poof* deom - python beanstalkc 中 job.stats 参考: 使用 easy_install beanstalkc API 参考 : http://github.com/earl/beanstalkc/blob/master/TUTORIAL 刚生成的 beanstalk {'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 0, 'ttr': 120, 'age': 6, 'pri': 2147483648L, 'delay': 0, ' state ': ' reserved ', ' time-left ': 114 , 'kicks': 0, 'id': 2} 以timeout了的 beanstalk,并且在其他client轮训到 job {'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 1, 'ttr': 120, 'age': 417, 'pri': 2147483648L, 'delay': 0, ' state ': ' reserved ', ' time-left ': 110 , 'kicks': 0, 'id': 2} {'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 1, 'ttr': 120, 'age': 415, 'pri': 2147483648L, 'delay': 0, ' state ': ' reserved ', ' time-left ': 4294967163L , 'kicks': 0, 'id': 2} 当没所有client 的 job 都到期 了 状态 {'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 2, 'ttr': 120, 'age': 417, 'pri': 2147483648L, 'delay': 0, ' state ': ' ready ', ' time-left ': 4294967161L , 'kicks': 0, 'id': 2} {'buries': 0, 'releases': 0, 'tube': 'default', 'timeouts': 2, 'ttr': 120, 'age': 415, 'pri': 2147483648L, 'delay': 0, ' state ': ' ready ', ' time-left ': 4294967163L , 'kicks': 0, 'id': 2} 其中 client1 job.delete client1 job.stats *poof* client2 job.stats *poof* 比较全的状态说明 - [官方文档] http://github.com/kr/beanstalkd/blob/v1.1/doc/protocol.txt?raw=true 官方示意图: 先简单说明下(完全自己理解的,欢迎拍砖。本人E人太差~看官档费劲,谅解下): job.stats状态 = [READY] 待处理, [RESERVED] 正处理, [DELAYED]延迟状态 , [BURIED] 隐藏状态 1. 延迟提交 py.client1.put>>> beanstalk.put('yes!', delay=10) py.client3.reserve>>> job = beanstalk.reserve() # 等待 10 秒 2. 管道测试 put-job到service端 可以指定 put的tube管道 如: py.client1.put>>> beanstalk.use('foo') py.client1.put>>> beanstalk.put('hey!') py.client2.reserve>>> job = beanstalk.reserve() # 一直拥塞,应为 他 watch 管道 'default' py.client3.reserve>>> beanstalk.watch('foo') # beanstalk.ignore('bar') 放弃监听 bar py.client3.reserve>>> job = beanstalk.reserve() py.client3.reserve>>> job.body #输出 'hey!' 3. 隐藏状态 现在吧 client 1/2/3 的 use watch 的管道都调回 default py.client2.reserve>>> job = beanstalk.reserve() py.client3.reserve>>> job = beanstalk.reserve() py.client1.put>>> beanstalk.put('隐藏状态!') py.client2.reserve>>> job.bury() #2 轮训得到 并且 修改 job 为隐藏状态 # 120 秒后 client3 没有轮训得到 此job py.client2.reserve>>> job.stats() {'buries': 1, 'releases': 0, 'tube': 'default', 'timeouts': 0, 'ttr': 120, 'age': 188, 'pri': 2147483648L, 'delay': 0, 'state': 'buried', 'time-left': 4294967228L, 'kicks': 0, 'id': 11} py.client2.reserve>>> beanstalk.kick( job.stats()['id'] ) #修改状态为 reserved # 立刻 client3 得到 job py.client3.reserve>>> job.stats() {'buries': 1, 'releases': 0, 'tube': 'default', 'timeouts': 0, 'ttr': 120, 'age': 313, 'pri': 2147483648L, 'delay': 0, 'state': 'reserved', 'time-left': 110, 'kicks': 1, 'id': 11} # 这时候 client2 / 3 同时 有 job 11 状态 'buries': 1,'timeouts': 0,'state': 'reserved' 4. peek 窥见 可以得到 一个 stats - read 的 job ,其他 client 可以 job = beanstalk.reserve() 后马上 job.stats 会变成 [RESERVED] py.client2.reserve>>> job = beanstalk.peek_ready() 取得 job 并看 本 client 能 处理能 >>> job = beanstalk.peek(3) >>> job.body 'yes!' >>> job.stats()['state'] 'ready' 这种形式西 job 不能 bury 等修改状态,但 可以 delete peek 系类 peek_buried peek_ready
本文转自博客园刘凯毅的博客,原文链接:,如需转载请自行联系原博主。