簡介:
這兩天看了消息隊列通信,打算在配置平台上應用起來。以前用過 zeromq 但是這東西太快了,還有就是 rabbitmq 有點大,新浪的朋友推薦了 qpid,簡單輕便。自己總結了下文檔,大家可以瞅瞅。
AMQP(消息隊列協議 Advanced Message Queuing Protocol)是一種消息協議 ,等同於 JMS,但是 JMS 只是 java 平台的方案,AMQP 是一個跨語言的協議。
AMQP 不分語言平台,主流的語言都支持,運維這邊的 perl,python,ruby 更是支持,所以大家就放心用吧。
主流的消息隊列通信類型:
點對點:A 發消息給 B。廣播:A 發給所有其他人的消息組播:A 發給多個但不是所有其他人的消息。Requester/response:類似訪問網頁的通信方式,客戶端發請求並等待,服務端回復該請求Pub-sub:類似雜誌發行,出版雜誌的人並不知道誰在看這本雜誌,訂閱的人並不關心誰在發表這本雜誌。出版的人只管將信息發佈出去,訂閱的人也只在需要的時候收到該信息。Store-and-forward:存儲轉發模型類似信件投遞,寫信的人將消息寫給某人,但在將信件發出的時候,收信的人並不一定在家等待,也並不知道有消息給他。但這個消息不會丟失,會放在收信者的信箱中。這種模型允許信息的異步交換。其他通信模型。。。
Publisher --->Exchange ---> MessageQueue --->Consumer
整個過程是異步的.Publisher,Consumer 相互不知道對方的存在,Exchange 負責交換/路由,依靠 Routing Key,每個消息者有一個 Routing Key,每個 Binding 將自已感興趣的 RoutingKey 告訴 Exchange,以便 Exchange 將相關的消息轉發給相應的 Queue!
幾個概念
幾個概念Producer,Routing Key,Exchange,Binding,Queue,Consumer.Producer: 消息的創建者,消息的發送者Routing Key:唯一用來映射消息該進入哪個隊列的標識Exchange:負責消息的路由,交換Binding:定義 Queue 和 Exchange 的映射關係Queue:消息隊列Consumer:消息的使用者Exchange類型Fan-Out:類似於廣播方式,不管 RoutingKeyDirect:根據 RoutingKey,進行關聯投寄Topic:類似於 Direct,但是支持多個 Key 關聯,以組的方式投寄。 key以.來定義界限。類似於 usea.news,usea.weather.這兩個消息是一組的。
圖片 28.1 pic
QPID
Qpid 是 Apache 開發的一款面向對象的消息中間件,它是一個 AMQP 的實現,可以和其他符合 AMQP 協議的系統進行通信。Qpid 提供了 C++/Python/Java/C# 等主流編程語言的客戶端庫,安裝使用非常方便。相對於其他的 AMQP 實現,Qpid 社區十分活躍,有望成為標準 AMQP 中間件產品。除了符合 AMQP 基本要求之外,Qpid 提供了很多額外的 HA 特性,非常適於集群環境下的消息通信!
基本功能外提供以下特性:
採用 Corosync(?)來保證集群環境下的 Fault-tolerant(?) 特性
支持 XML 的 Exchange,消息為 XML 時,彩用 Xquery 過濾
支持 plugin
提供安全認證,可對 producer/consumer 提供身份認證
qpidd --port --no-data-dir --auth
port:端口
--no-data-dir:不指定數據目錄
--auth:不啟用安全身份認證
啟動後自動創建一些 Exchange,amp.topic,amp.direct,amp.fanout
tools:
Qpid-config:維護 Queue,Exchange,內部配置Qpid-route:配置 broker Federation(聯盟?集群?)Qpid-tool:監控
咱們說完介紹了,這裡就趕緊測試下。
服務器端的安裝:
yum install qpid-cpp-serveryum install qpid-tools/etc/init.d/qpidd start
發佈端的測試代碼:
圖片 28.2 pic
一些測試代碼轉自: ibm 的開發社區
#!/usr/bin/env python #xiaorui.cc #轉自 ibm 開發社區import optparse, timefrom qpid.messaging import *from qpid.util import URLfrom qpid.log import enable, DEBUG, WARNdef nameval(st): idx = st.find("=") if idx >= 0: name = st[0:idx] value = st[idx+1:] else: name = st value = None return name, valueparser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", description="Send messages to the supplied address.")parser.add_option("-b", "--broker", default="localhost", help="connect to specified BROKER (default %default)")parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect")parser.add_option("-i", "--reconnect-interval", type="float", default=3, help="interval between reconnect attempts")parser.add_option("-l", "--reconnect-limit", type="int", help="maximum number of reconnect attempts")parser.add_option("-c", "--count", type="int", default=1, help="stop after count messages have been sent, zero disables (default %default)")parser.add_option("-t", "--timeout", type="float", default=None, help="exit after the specified time")parser.add_option("-I", "--id", help="use the supplied id instead of generating one")parser.add_option("-S", "--subject", help="specify a subject")parser.add_option("-R", "--reply-to", help="specify reply-to address")parser.add_option("-P", "--property", dest="properties", action="append", default=, metavar="NAME=VALUE", help="specify message property")parser.add_option("-M", "--map", dest="entries", action="append", default=, metavar="KEY=VALUE", help="specify map entry for message body")parser.add_option("-v", dest="verbose", action="store_true", help="enable logging")opts, args = parser.parse_argsif opts.verbose: enable("qpid", DEBUG)else: enable("qpid", WARN)if opts.id is None: spout_id = str(uuid4)else: spout_id = opts.idif args: addr = args.pop(0)else: parser.error("address is required")content = Noneif args: text = " ".join(args)else: text = Noneif opts.entries: content = {} if text: content["text"] = text for e in opts.entries: name, val = nameval(e) content[name] = valelse: content = textconn = Connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit)try: conn.open ssn = conn.session snd = ssn.sender(addr) count = 0 start = time.time while (opts.count == 0 or count < opts.count) and (opts.timeout is None or time.time - start < opts.timeout): msg = Message(subject=opts.subject, reply_to=opts.reply_to, content=content) msg.properties["spout-id"] = "%s:%s" % (spout_id, count) for p in opts.properties: name, val = nameval(p) msg.properties[name] = val snd.send(msg) count += 1 print msgexcept SendError, e: print eexcept KeyboardInterrupt: passconn.close
客戶端的測試代碼:
圖片 28.3 pic
#!/usr/bin/env python #xiaorui.cc ##轉自 ibm 開發社區import optparsefrom qpid.messaging import *from qpid.util import URLfrom qpid.log import enable, DEBUG, WARNparser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", description="Drain messages from the supplied address.")parser.add_option("-b", "--broker", default="localhost", help="connect to specified BROKER (default %default)")parser.add_option("-c", "--count", type="int", help="number of messages to drain")parser.add_option("-f", "--forever", action="store_true", help="ignore timeout and wait forever")parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect")parser.add_option("-i", "--reconnect-interval", type="float", default=3, help="interval between reconnect attempts")parser.add_option("-l", "--reconnect-limit", type="int", help="maximum number of reconnect attempts")parser.add_option("-t", "--timeout", type="float", default=0, help="timeout in seconds to wait before exiting (default %default)")parser.add_option("-p", "--print", dest="format", default="%(M)s", help="format string for printing messages (default %default)")parser.add_option("-v", dest="verbose", action="store_true", help="enable logging")opts, args = parser.parse_argsif opts.verbose: enable("qpid", DEBUG)else: enable("qpid", WARN)if args: addr = args.pop(0)else: parser.error("address is required")if opts.forever: timeout = Noneelse: timeout = opts.timeoutclass Formatter: def __init__(self, message): self.message = message self.environ = {"M": self.message, "P": self.message.properties, "C": self.message.content} def __getitem__(self, st): return eval(st, self.environ)conn = Connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit)try: conn.open ssn = conn.session rcv = ssn.receiver(addr) count = 0 while not opts.count or count < opts.count: try: msg = rcv.fetch(timeout=timeout) print opts.format % Formatter(msg) count += 1 ssn.acknowledge except Empty: breakexcept ReceiverError, e: print eexcept KeyboardInterrupt: passconn.close
Browse 模式的意思是,瀏覽的意思,一個特殊的需求,我訪問了一次,別人也能訪問。
Consume 模式的意思是,我瀏覽了一次後,刪除這一條。別人就訪問不到啦。
這個是瀏覽模式:
圖片 28.4 pic
sub-pub 通信的例子
Pub-sub 是另一種很有用的通信模型。恐怕它的名字就源於出版發行這種現實中的信息傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。
服務端qpid-config add exchange topic news-service./spout news-service/news xiaorui.cc客戶端:./drain -t 120 news-service/#.news
PUB 端,創建 TOPIC 點!
圖片 28.5 pic
SUB端,也就是接收端!
圖片 28.6 pic
總結:
qpid 挺好用的,比 rabbitmq 要輕型,比 zeromq 保險點! 各方面的文檔也都很健全,值得一用。話說,這三個消息隊列我也都用過,最一開始用的是 redis 的 pubsub 做日誌收集和信息通知,後來在做集群相關的項目的時候,我自己搞了一套 zeromq 的分佈式任務分發,和 saltstack 很像,當然了遠沒有萬人用的 salt 強大。 rabbitmq 的用法,更是看中他的安全和持久化,當然性能真的不咋地。
關於 qpid 的性能我沒有親自做大量的測試,但是聽朋友說,加持久化可以到 7k,不加持久化可以到 1500 左右。
本文出自 「峰雲,就她了。」 博客,謝絕轉載!