AMQP協議

當前各種應用大量使用異步消息模型,並隨之產生眾多消息中間件產品及協議,標準的不一致使應用與中間件之間的耦合限制產品的選擇,並增加維護成本。 AMQP是一個提供統一消息服務的應用層標準協議,基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同開發語言等條件的限制。
當然這種降低耦合的機制是基於與上層產品,語言無關的協議。 AMQP協議是一種二進制協議,提供客戶端應用與消息中間件之間異步、安全、高效地交互。從整體來看,AMQP協議可劃分為三層:
這種分層架構類似於OSI網絡協議,可替換各層實現而不影響與其它層的交互。 AMQP定義了合適的服務器端域模型,用於規範服務器的行為(AMQP服務器端可稱為broker)。在這裡Model層決定這些基本域模型所產生的行為,這種行為在AMQP中用”command”表示,在後文中會著重來分析這些域模型。 Session層定義客戶端與broker之間的通信(通信雙方​​都是一個peer,可互稱做partner),為command的可靠傳輸提供保障。 Transport層專注於數據傳送,並與Session保持交互,接受上層的數據,組裝成二進制流,傳送到receiver後再解析數據,交付給Session層。 Session層需要Transport層完成網絡異常情況的匯報,順序傳送command等工作。
上面是對AMQP協議的大致說明。下面會以我們對消息服務的需求來理解AMQP所提供的域模型。
消息中間件的主要功能是消息的路由(Routing)和緩存(Buffering)。在AMQP中提供類似功能的兩種域模型:Exchange 和Message queue。
Exchange接收消息生產者(Message Producer)發送的消息根據不同的路由算法將消息發送往Message queue。 Message queue會在消息不能被正常消費時緩存這些消息,具體的緩存策略由實現者決定,當message queue與消息消費者(Message consumer)之間的連接通暢時,Message queue有將消息轉發到consumer的責任。
Message是當前模型中所操縱的基本單位,它由Producer產生,經過Broker被Consumer所消費。它的基本結構有兩部分: Header和Body。 Header是由Producer添加上的各種屬性的集合,這些屬性有控制Message是否可被緩存,接收的queue是哪個,優先級是多少等。 Body是真正需要傳送的數據,它是對Broker不可見的二進制數據流,在傳輸過程中不應該受到影響。
一個broker中會存在多個Message queue,Exchange怎樣知道它要把消息發送到哪個Message queue中去呢? 這就是上圖中所展示Binding的作用。 Message queue的創建是由client application控制的,在創建Message queue後需要確定它來接收並保存哪個Exchange路由的結果。 Binding是用來關聯Exchange與Message queue的域模型。 Client application控制Exchange與某個特定Message queue關聯,並將這個queue接受哪種消息的條件綁定到Exchange,這個條件也叫Binding key 或是Criteria。
在與多個Message queue關聯後,Exchange中就會存在一個路由表,這個表中存儲著每個Message queue所需要消息的限制條件。 Exchange就會檢查它接受到的每個Message的Header及Body信息,來決定將Message路由到哪個queue中去。 Message的Header中應該有個屬性叫Routing Key,它由Message發送者產生,提供給Exchange路由這條Message的標準。 Exchange根據不同路由算法有不同有Exchange Type。比如有Direct類似,需要Binding key 等於Routing key;也有Binding key與Routing key符合一個模式關係;也有根據Message包含的某些屬性來判斷。一些基礎的路由算法由AMQP所提供,client application也可以自定義各種自己的擴展路由算法。 那麼一個Message的處理流程類似於這樣:
在這裡有個新名詞需要介紹: Virtual Host。一個Virtual Host可持有一些Exchange和Message queue。它是一個虛擬概念,一個Virtual Host可以是一台服務器,也可以是由多台服務器組成的集群。同步擴展下,Exchange與Message queue的部署也可以是一台或是多台服務器上。
Message的產生者和消費者可能是同一個應用。整個AMQP定義的就是Client application與Broker之間的交互。在粗略介紹完AMQP的域模型後,可以關注下Client是怎樣與Broker建立起連接的。
在AMQP中,Client application想要與Broker溝通,就需要建立起與Broker的connection,這種connection其實是與Virtual Host相關聯的,也就是說,connection是建立在client與Virtual Host之間。可以在一個connection上並發運行多個channel,每個channel執行與Broker的通信,我們前面提供的session就是依附於channel上的。
這裡的Session可以有多種定義,既可以表示AMQP內部提供的command分發機制,也可以說是在宏觀上區別與域模型的接口。正常理解就是我們平時所說的交互context,主要作用就是在網絡上可靠地傳遞每一個command。在AMQP的設計中,應當是藉鑑了TCP的各種​​設計,用於保證這種可靠性。
在Session層,為上層所需要交互的每個command分配一個惟一標識符(可以是一個UUID),是為了在傳輸過程中可以對command做校驗和重傳。 Command發送端也需要記錄每個發送出去的command到Replay Buffer,以期得到接收方的回饋,保證這個command被接收方明確地接收或是已執行這個command。對於超時沒有收到反饋的command,發送方再次重傳。如果接收方已明確地回饋信息想要告知command發送方但這條信息在中途丟失或是其它問題發送方沒有收到,那麼發送方不斷重傳會對接收方產生影響,為了降低這種影響, command接收方設置一個過濾器Idempotency Barrier,來攔截那些已接收過的command。

留言

這個網誌中的熱門文章

Json概述以及python對json的相關操作

利用 Keepalived 提供 VIP

Docker容器日誌查看與清理