博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python使用amqp的例子
阅读量:5883 次
发布时间:2019-06-19

本文共 8227 字,大约阅读时间需要 27 分钟。

web程序需要给IPHONE手机推送消息,移动端同事写好了一个LIB,但是这个LIB使用的是阻塞IO,在APPLE服务器返回前程序是阻塞的,他们用了多线程来解决这个问题,但是webpy运行在apache里,无法进行线程管理,所以就迫切需要一个异步的机制来解决这个问题,希望做到需要发送消息时。调用一个函数,把数据扔进管道或者消息队列后就立即返回,也不管数据是否真的出去了,它相当于生产者,再有一个程序从管道或者队列中读取,进行实际的发送,相当于消费者

但是python似乎只封装了IPC(UNIX/LINUX平台进程通信规范)的匿名管道,命名管道的API,并无消息队列,而且管道似乎不易操作,因此想使用第三方的消息队列工具,那么我老大进行了技术选型,最终选择了amqp,这个消息队列组件使用erlang编写,启动了一个socket服务器,程序通过socket进行入队和出队的操作,不过这个组件应该是提供了大量的库隐藏了通信的部分,使代码看起来就像调用函数进行队列操作

 

client.py  测试客户端 向队列写入消息

#!/usr/bin/env python# -*- coding: utf-8 -*-#client.pyimport sysimport timeimport jsonfrom amqplib import client_0_8 as amqpconn = amqp.Connection(    host="localhost:5672",    userid="guest",    password="guest",    virtual_host="/",    insist=False)chan = conn.channel()i = 0while 1:    #msg = amqp.Message('Message %d' % i)        #笔记更新    '''    data = {        "noteId" : 1,        "recvUserId" : 2,        "title" : "test",        "updateUserName":"你好",        "remindCount":10,        "projectName":"11",        "token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",        "isPushed":0,        "messageType":"NoteUpdated",    }    '''    '''    data = {        "projectId": 1,        "recvUserId": 2,        "reqUserName": "testUser",        "reqUserStatus": "pending",        "remindCount": 10,        "projectName":"11",        "token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",        "messageType":"Apply",    }    '''    data = {        "projectId": 1,        "recvUserId": 2,        "reqUserName": "testUser",        "reqUserStatus": "",        "remindCount": 10,        "projectName": "11",        "token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",        "messageType":"Review",    }    s = json.dumps(data,ensure_ascii=False)    print s    msg = amqp.Message(s)    msg.properties["delivery_mode"] = 2    chan.basic_publish(msg,        exchange="sorting_room",        routing_key="testkey")    i += 1    time.sleep(1)    breakchan.close()conn.close()

 

 

server.py  读取队列

#!/usr/bin/env python# -*- coding: utf-8 -*-from amqplib import client_0_8 as amqpimport process conn = amqp.Connection(    host="localhost:5672",    userid="guest",    password="guest",    virtual_host="/",    insist=False)chan = conn.channel()chan.queue_declare(    queue="po_box",    durable=True,    exclusive=False,    auto_delete=False)chan.exchange_declare(    exchange="sorting_room",    type="direct",    durable=True,    auto_delete=False,)chan.queue_bind(    queue="po_box",    exchange="sorting_room",    routing_key="testkey")def recv_callback(msg):    #TODO    #print msg.body    process.starup(msg.body)chan.basic_consume(    queue='po_box',    no_ack=True,    callback=recv_callback,    consumer_tag="testtag")while True:    chan.wait()#chan.basic_cancel("testtag")#chan.close()#conn.close()

 

process.py 业务逻辑模块

#!/usr/bin/env python# -*- coding: utf-8 -*-import jsonimport timeimport threadfrom APNSWrapper import *#---------------------------------------------------------------------------------------------'''从外部的数据转换成为标准数据'''def convert(args):    print args    data = json.loads(args)    return data'''线程入口'''def threadMain(args):    data = convert(args)    msg = messagesFactory(data)    msg.fillMessages(data)    msg.push()    #---------------------------------------------------------------------------------------------'''业务入口'''def startup(args):    thread.start_new_thread(threadMain,(args,))#---------------------------------------------------------------------------------------------'''消息工厂'''def messagesFactory(data):    if data["messageType"] == "NoteUpdated":        return NoteUpdateMessages()    if data["messageType"] == "Apply":        return ApplyMessages()    if data["messageType"] == "Review":        return ReviewMessages()    #---------------------------------------------------------------------------------------------'''消息基类'''class Messages:    def __init__(self):        self.Messages = []    def makeMessage(self,data):        pass    def fillMessages(self,data):        message = self.makeMessage(data)        self.Messages.append(message)    def push(self):        try:            wrapper = APNSNotificationWrapper('/etc/ck.pem', True)            for message in self.Messages:                wrapper.append(message)            wrapper.notify()            print 'send success'        except Exception, e:            raise#---------------------------------------------------------------------------------------------'''笔记更新消息类'''class NoteUpdateMessages(Messages):    def makeMessage(self,data):        token = data["token"]        deviceToken = token.decode('hex')        message = APNSNotification()        message.token(deviceToken)        for key in ['updateUserName','title']:            if type(data[key]) is unicode:                data[key] = data[key].encode('utf8')        alert = '共%d篇更新 %s编辑了“%s”' % (int(data["remindCount"]), data["updateUserName"], data["title"])        if data["isPushed"] == 0:            message.alert(alert)        message.badge(int(data["remindCount"]))        message.sound('qing.caf')            property1 = APNSProperty("NOTE_ID", int(data["noteId"]) )        message.appendProperty(property1)        property2 = APNSProperty("USER_ID", int(data["recvUserId"]) )        message.appendProperty(property2)        property3 = APNSProperty("REMIND_TIME", int(time.time()) )        message.appendProperty(property3)        message.appendProperty(APNSProperty("REMIND_TYPE", 1 ))        return message#---------------------------------------------------------------------------------------------'''申请加入项目消息类'''class ApplyMessages(Messages):    def makeMessage(self,data):        token = data["token"]        deviceToken = token.decode('hex')        message = APNSNotification()        message.token(deviceToken)        alertMsg = ''        status = data["reqUserStatus"]        if status == 'pending':            alertMsg = '%s申请加入%s群组,请审批' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)        elif status == 'active':            alertMsg = '%s已加入%s群组' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)        elif status == 'deny':            alertMsg = '%s被拒绝加入%s群组' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)        elif status == 'removed':            alertMsg = '%s已从%s群组移除' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)        message.alert(alertMsg)        message.badge(int(data["remindCount"]))        message.sound('qing.caf')            message.appendProperty(APNSProperty("PROJECT_ID", int(data["projectId"]) ))        message.appendProperty(APNSProperty("USER_ID", int(data["recvUserId"]) ))        message.appendProperty(APNSProperty("REMIND_TIME", int(time.time()) ))        message.appendProperty(APNSProperty("REMIND_TYPE", 3 ))        message.appendProperty(APNSProperty("REMIND_MSG", alertMsg ))        return message#---------------------------------------------------------------------------------------------'''审核结果消息类'''class ReviewMessages(Messages):    def makeMessage(self,data):        token = data["token"]        deviceToken = token.decode('hex')        message = APNSNotification()        message.token(deviceToken)            alertMsg = ''        status = data["reqUserStatus"]        if status == 'active':            alertMsg = '您已加入%s群组' % (data["projectName"].encode('utf-8'),)        elif status == 'deny':            alertMsg = '您被拒绝加入%s群组' % (data["projectName"].encode('utf-8'),)        elif status == 'removed':            alertMsg = '您已从%s群组移除' % (data["projectName"].encode('utf-8'),)        message.alert(alertMsg)        message.badge(int(data["remindCount"]))        message.sound('qing.caf')            message.appendProperty(APNSProperty("PROJECT_ID", int(data["projectId"]) ))        message.appendProperty(APNSProperty("USER_ID", int(data["recvUserId"]) ))        message.appendProperty(APNSProperty("REMIND_TIME", int(time.time()) ))        message.appendProperty(APNSProperty("REMIND_TYPE", 2 ))        message.appendProperty(APNSProperty("REMIND_MSG", alertMsg ))        return message

转载于:https://www.cnblogs.com/code-style/archive/2012/06/11/2544830.html

你可能感兴趣的文章
我的架构经验系列文章 - 后端架构 - 语言层面
查看>>
DEFERRED_SEGMENT_CREATION
查看>>
读取手机硬件信息
查看>>
一致哈希
查看>>
spring3: Bean的作用域
查看>>
The connection to adb is down, and a severe error has occured. 问题解决
查看>>
在Jenkins中配置运行远程shell命令
查看>>
代码杂记
查看>>
linux中防CC攻击两种实现方法(转)
查看>>
《Programming WPF》翻译 第9章 4.模板
查看>>
hdu2159
查看>>
Windows7+VS2012下OpenGL 4的环境配置
查看>>
Maven for Eclipse 第一章 ——Maven的介绍
查看>>
Linux Kernel中断子系统来龙去脉浅析【转】
查看>>
Linux NFS服务器的安装与配置
查看>>
Ada boost学习
查看>>
Unity中SendMessage和Delegate效率比较
查看>>
Linux下EPoll通信模型简析
查看>>
react-native 制作购物车ShopCart
查看>>
Linux服务器 java生成的图片验证码乱码问题
查看>>