分佈式IM(即時通訊) 系統
老讀者應該還記得我在去年國慶節前分享過一篇《設計一個百萬級的消息推送系統》;雖然我在文中有貼一些偽代碼,依然有些朋友希望能直接分享一些可以運行的源碼;這麼久了是時候把坑填上了。
目錄結構:
目錄結構:
於是在之前的基礎上我完善了一些內容,先來看看這個項目的介紹吧:
CIM(CROSS-IM)
一款面向開發者的IM(即时通讯)
系統;同時提供了一些組件幫助開發者構建一款屬於自己可水平擴展的IM
。
借助
CIM
你可以實現以下需求:IM
即時通訊系統。- 適用於
APP
的消息推送中間件。 IOT
海量連接場景中的消息透傳中間件。
完整源碼託管在GitHub : github.com/crossoverJi…
架構設計
下面來看看具體的架構設計。
CIM
中的各個組件均採用SpringBoot
構建。- 採用
Netty + Google Protocol Buffer
構建底層通信。 Redis
存放各個客戶端的路由信息、賬號信息、在線狀態等。Zookeeper
用於IM-server
服務的註冊與發現。
整體主要由以下模塊組成:
cim-server
IM
服務端;用於接收client
連接、消息透傳、消息推送等功能。
支持集群部署。
cim-forward-route
消息路由服務器;用於處理消息路由、消息轉發、用戶登錄、用戶下線以及一些運營工具(獲取在線用戶數等)。
cim-client
IM
客戶端;給用戶使用的消息終端,一個命令即可啟動並向其他人發起通訊(群聊、私聊);同時內置了一些常用命令方便使用。流程圖
整體的流程也比較簡單,流程圖如下:
- 客戶端向
route
發起登錄。 - 登錄成功從
Zookeeper
中選擇可用IM-server
返回給客戶端,並保存登錄、路由信息到Redis
。 - 客戶端向
IM-server
發起長連接,成功後保持心跳。 - 客戶端下線時通過
route
清除狀態信息。
所以當我們自己部署時需要以下步驟:
- 搭建基礎中間件
Redis、Zookeeper
。 - 部署
cim-server
,這是真正的IM服務器,為了滿足性能需求所以支持水平擴展,只需要註冊到同一個Zookeeper
即可。 - 部署
cim-forward-route
,這是路由服務器,所有的消息都需要經過它。由於它是無狀態的,所以也可以利用Nginx
代理提高可用性。 cim-client
真正面向用戶的客戶端;啟動之後會自動連接IM 服務器便可以在控制台收發消息了。
詳細設計
接下來重點看看具體的實現,比如群聊、私聊消息如何流轉;IM 服務端負載均衡;服務如何註冊發現等等。
IM 服務端
先來看看服務端;主要是實現客戶端上下線、消息下發等功能。
首先是服務啟動:
由於是在
SpringBoot
中搭建的,所以在應用啟動時需要啟動Netty
服務。
從
pipline
中可以看出使用了Protobuf
的編解碼(具體報文在客戶端中分析)。註冊發現
需要滿足
IM
服務端的水平擴展需求,所以cim-server
是需要將自身數據發佈到註冊中心的。
所以在應用啟動成功後需要將自身數據註冊到
Zookeeper
中。
最主要的目的就是將當前應用的
ip + cim-server-port+ http-port
註冊上去。
上圖是我在演示環境中註冊的兩個
cim-server
實例(由於在一台服務器,所以只是端口不同)。
這樣在客戶端(監聽這個
Zookeeper
節點)就能實時的知道目前可用的服務信息。登錄
當客戶端請求
cim-forward-route
中的登錄接口(詳見下文)做完業務驗證(就相當於日常登錄其他網站一樣)之後,客戶端會向服務端發起一個長連接,如之前的流程所示:
這時客戶端會發送一個特殊報文,表明當前是登錄信息。
服務端收到後就需要將該客戶端的
userID
和當前Channel
通道關係保存起來。
同時也緩存了用戶的信息,也就是
userID
和用戶名。離線
當客戶端斷線後也需要將剛才緩存的信息清除掉。
同時也需要調用
route
接口清除相關信息(具體接口看下文)。IM 路由
從架構圖中可以看出,路由層是非常重要的一環;它提供了一系列的
HTTP
服務承接了客戶端和服務端。
目前主要是以下幾個接口。
註冊接口
由於每一個客戶端都是需要登錄才能使用的,所以第一步自然是註冊。
這裡就設計的比較簡單,直接利用
Redis
來存儲用戶信息;用戶信息也只有ID
和userName
而已。
只是為了方便查詢在
Redis
中的KV
又反過來存儲了一份VK
,這樣ID
和userName
都必須唯一。登錄接口
這裡的登錄和
cim-server
中的登錄不一樣,具有業務性質,- 登錄成功之後需要判斷是否是重複登錄(一個用戶只能運行一個客戶端)。
- 登錄成功後需要從
Zookeeper
中獲取服務列表(cim-server
)並根據某種算法選擇一台服務返回給客戶端。 - 登錄成功之後還需要保存路由信息,也就是當前用戶分配的服務實例保存到
Redis
中。
為了實現只能一個用戶登錄,使用了
Redis
中的set
來保存登錄信息;利用userID
作為key
,重複的登錄就會寫入失敗。類似於Java 中的HashSet,只能去重保存。
獲取一台可用的路由實例也比較簡單:
- 先從
Zookeeper
獲取所有的服務實例做一個內部緩存。 - 輪詢選擇一台服務器(目前只有這一種算法,後續會新增)。
當然要獲取
Zookeeper
中的服務實例前自然是需要監聽cim-server
之前註冊上去的那個節點。
具體代碼如下:
也是在應用啟動之後監聽
Zookeeper
中的路由節點,一旦發生變化就會更新內部緩存。這裡使用的是Guava的cache,它基於ConcurrentHashMap
,所以可以保證清除、新增缓存
的原子性。
群聊接口
這是一個真正發消息的接口,實現的效果就是其中一個客戶端發消息,其餘所有客戶端都能收到!
流程肯定是客戶端發送一條消息到服務端,服務端收到後在上文介紹的
SessionSocketHolder
中遍歷所有Channel
(通道)然後下發消息即可。
服務端是單機倒也可以,但現在是集群設計。所以所有的客戶端會根據之前的輪詢算法分配到不同的
cim-server
實例中。
因此就需要路由層來發揮作用了。
路由接口收到消息後首先遍歷出所有的客戶端和服務實例的關係。
路由關係在
Redis
中的存放如下:
由於
Redis
單線程的特質,當數據量大時;一旦使用keys匹配所有cim-route:*
數據,會導致Redis不能處理其他請求。
所以這裡改為使用scan命令來遍歷所有的
cim-route:*
。
接著會挨個調用每個客戶端所在的服務端的
HTTP
接口用於推送消息。
在
cim-server
中的實現如下:cim-server
收到消息後會在內部緩存中查詢該userID 的通道,接著只需要發消息即可。在線用戶接口
這是一個輔助接口,可以查詢出當前在線用戶信息。
實現也很簡單,也就是查詢之前保存”用戶登錄狀態的那個去重
set
“即可。私聊接口
之所以說獲取在線用戶是一個輔助接口,其實就是用於輔助私聊使用的。
一般我們使用私聊的前提肯定得知道當前哪些用戶在線,接著你才會知道你要和誰進行私聊。
類似於這樣:
在我們這個場景中,私聊的前提就是需要獲得在線用戶的
userID
。
所以私聊接口在收到消息後需要查詢到接收者所在的
cim-server
實例信息,後續的步驟就和群聊一致了。調用接收者所在實例的HTTP
接口下發信息。
只是群聊是遍歷所有的在線用戶,私聊只發送一個的區別。
下線接口
一旦客戶端下線,我們就需要將之前存放在
Redis
中的一些信息刪除掉(路由信息、登錄狀態)。IM 客戶端
客戶端中的一些邏輯其實在上文已經談到一些了。
登錄
第一步也就是登錄,需要在啟動時調用
route
的登錄接口,獲得cim-server
信息再創建連接。
登錄過程中
route
接口會判斷是否為重複登錄,重複登錄則會直接退出程序。
接下來是利用
route
接口返回的cim-server
實例信息(ip+port
)創建連接。
最後一步就是發送一個登錄標誌的信息到服務端,讓它保持客戶端和
Channel
的關係。自定義協議
上文提到的一些
登录报文、真正的消息报文
這些其實都是在我們自定義協議中可以區別出來的。
由於是使用
Google Protocol Buffer
編解碼,所以先看看原始格式。
其實這個協議中目前一共就三個字段:
requestId
可以理解為userId
。reqMsg
就是真正的消息。type
也就是上文提到的消息類別。
目前主要是三種類型,分別對應不同的業務:
心跳
為了保持客戶端和服務端的連接,每隔一段時間沒有發送消息都需要自動的發送心跳。
目前的策略是每隔一分鐘就是發送一個心跳包到服務端:
這樣服務端每隔一分鐘沒有收到業務消息時就會收到
ping
的心跳包:內置命令
客戶端也內置了一些基本命令來方便使用。
命令 | 描述 |
---|---|
:q | 退出客戶端 |
:olu | 獲取所有在線用戶信息 |
:all | 獲取所有命令 |
: | 更多命令正在開發中。。 |
比如輸入
:q
就會退出客戶端,同時會關閉一些系統資源。
當輸入
:olu
( onlineUser
的簡寫)就會去調用route
的獲取所有在線用戶接口。群聊
群聊的使用非常簡單,只需要在控制台輸入消息回車即可。
這時會去調用
route
的群聊接口。私聊
私聊也是同理,但前提是需要觸發關鍵字;使用
userId;;消息内容
這樣的格式才會給某個
用戶發送消息,所以一般都需要先使用
:olu
命令獲取所以在線用戶才方便使用。消息回調
為了滿足一些定制需求,比如消息需要保存之類的。
所以在客戶端收到消息之後會回調一個接口,在這個接口中可以自定義實現。
因此先創建了一個
caller
的bean
,這個bean
中包含了一個CustomMsgHandleListener
接口,需要自行處理只需要實現此接口即可。自定義界面
由於我自己不怎麼會寫界面,但保不准有其他大牛會寫。所以客戶端中的群聊、私聊、獲取在線用戶、消息回調等業務(以及之後的業務)都是以接口形式提供。
也方便後面做頁面集成,只需要調這些接口就行了;具體實現不用怎麼關心。
留言
張貼留言