更新時(shí)間:2022-09-16 來源:黑馬程序員 瀏覽量:
在之前的內(nèi)容中,我們講解了消費(fèi)者端服務(wù)發(fā)現(xiàn)與提供者端服務(wù)暴露的相關(guān)內(nèi)容,同時(shí)也知道消費(fèi)者端通過內(nèi)置的負(fù)載均衡算法獲取合適的調(diào)用invoker進(jìn)行遠(yuǎn)程調(diào)用。那么,本章節(jié)重點(diǎn)關(guān)注的就是遠(yuǎn)程調(diào)用過程即網(wǎng)絡(luò)通信。

網(wǎng)絡(luò)通信位于Remoting模塊:
- Remoting 實(shí)現(xiàn)是 Dubbo 協(xié)議的實(shí)現(xiàn),如果你選擇 RMI 協(xié)議,整個(gè) Remoting 都不會(huì)用上;
- Remoting 內(nèi)部再劃為 `Transport 傳輸層` 和 `Exchange 信息交換層`;
- Transport 層只負(fù)責(zé)單向消息傳輸,是對(duì) Mina, Netty, Grizzly 的抽象,它也可以擴(kuò)展 UDP 傳輸;
- Exchange 層是在傳輸層之上封裝了 Request-Response 語義;
網(wǎng)絡(luò)通信的問題:
客戶端與服務(wù)端連通性問題
粘包拆包問題
異步多線程數(shù)據(jù)一致問題
通信協(xié)議
dubbo內(nèi)置,dubbo協(xié)議 ,rmi協(xié)議,hessian協(xié)議,http協(xié)議,webservice協(xié)議,thrift協(xié)議,rest協(xié)議,grpc協(xié)議,memcached協(xié)議,redis協(xié)議等10種通訊協(xié)議。各個(gè)協(xié)議特點(diǎn)如下
dubbo協(xié)議
Dubbo 缺省協(xié)議采用單一長(zhǎng)連接和 NIO 異步通訊,適合于小數(shù)據(jù)量大并發(fā)的服務(wù)調(diào)用,以及服務(wù)消費(fèi)者機(jī)器數(shù)遠(yuǎn)大于服務(wù)提供者機(jī)器數(shù)的情況。
缺省協(xié)議,使用基于 mina `1.1.7` 和 hessian `3.2.1` 的 tbremoting 交互。
- 連接個(gè)數(shù):?jiǎn)芜B接
- 連接方式:長(zhǎng)連接
- 傳輸協(xié)議:TCP
- 傳輸方式:NIO 異步傳輸
- 序列化:Hessian 二進(jìn)制序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包較小(建議小于100K),消費(fèi)者比提供者個(gè)數(shù)多,單一消費(fèi)者無法壓滿提供者,盡量不要用 dubbo 協(xié)議傳輸大文件或超大字符串。
- 適用場(chǎng)景:常規(guī)遠(yuǎn)程服務(wù)方法調(diào)用
rmi協(xié)議
RMI 協(xié)議采用 JDK 標(biāo)準(zhǔn)的 `java.rmi.*` 實(shí)現(xiàn),采用阻塞式短連接和 JDK 標(biāo)準(zhǔn)序列化方式。
- 連接個(gè)數(shù):多連接
- 連接方式:短連接
- 傳輸協(xié)議:TCP
- 傳輸方式:同步傳輸
- 序列化:Java 標(biāo)準(zhǔn)二進(jìn)制序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包大小混合,消費(fèi)者與提供者個(gè)數(shù)差不多,可傳文件。
- 適用場(chǎng)景:常規(guī)遠(yuǎn)程服務(wù)方法調(diào)用,與原生RMI服務(wù)互操作
hessian協(xié)議
Hessian 協(xié)議用于集成 Hessian 的服務(wù),Hessian 底層采用 Http 通訊,采用 Servlet 暴露服務(wù),Dubbo 缺省內(nèi)嵌 Jetty 作為服務(wù)器實(shí)現(xiàn)。
Dubbo 的 Hessian 協(xié)議可以和原生 Hessian 服務(wù)互操作,即:
- 提供者用 Dubbo 的 Hessian 協(xié)議暴露服務(wù),消費(fèi)者直接用標(biāo)準(zhǔn) Hessian 接口調(diào)用
- 或者提供方用標(biāo)準(zhǔn) Hessian 暴露服務(wù),消費(fèi)方用 Dubbo 的 Hessian 協(xié)議調(diào)用。
- 連接個(gè)數(shù):多連接
- 連接方式:短連接
- 傳輸協(xié)議:HTTP
- 傳輸方式:同步傳輸
- 序列化:Hessian二進(jìn)制序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包較大,提供者比消費(fèi)者個(gè)數(shù)多,提供者壓力較大,可傳文件。
- 適用場(chǎng)景:頁面?zhèn)鬏?,文件傳輸,或與原生hessian服務(wù)互操作
http協(xié)議
基于 HTTP 表單的遠(yuǎn)程調(diào)用協(xié)議,采用 Spring 的 HttpInvoker 實(shí)現(xiàn)
- 連接個(gè)數(shù):多連接
- 連接方式:短連接
- 傳輸協(xié)議:HTTP
- 傳輸方式:同步傳輸
- 序列化:表單序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包大小混合,提供者比消費(fèi)者個(gè)數(shù)多,可用瀏覽器查看,可用表單或URL傳入?yún)?shù),暫不支持傳文件。
- 適用場(chǎng)景:需同時(shí)給應(yīng)用程序和瀏覽器 JS 使用的服務(wù)。
webservice協(xié)議
基于 WebService 的遠(yuǎn)程調(diào)用協(xié)議,基于 Apache CXF 實(shí)現(xiàn)](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/webservice.html#fn2)。
可以和原生 WebService 服務(wù)互操作,即:
- 提供者用 Dubbo 的 WebService 協(xié)議暴露服務(wù),消費(fèi)者直接用標(biāo)準(zhǔn) WebService 接口調(diào)用,
- 或者提供方用標(biāo)準(zhǔn) WebService 暴露服務(wù),消費(fèi)方用 Dubbo 的 WebService 協(xié)議調(diào)用。
- 連接個(gè)數(shù):多連接
- 連接方式:短連接
- 傳輸協(xié)議:HTTP
- 傳輸方式:同步傳輸
- 序列化:SOAP 文本序列化(http + xml)
- 適用場(chǎng)景:系統(tǒng)集成,跨語言調(diào)用
thrift協(xié)議
當(dāng)前 dubbo 支持 [[1\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn1)的 thrift 協(xié)議是對(duì) thrift 原生協(xié)議 [[2\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn2) 的擴(kuò)展,在原生協(xié)議的基礎(chǔ)上添加了一些額外的頭信息,比如 service name,magic number 等。
rest協(xié)議
基于標(biāo)準(zhǔn)的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的簡(jiǎn)寫)實(shí)現(xiàn)的REST調(diào)用支持
grpc協(xié)議
Dubbo 自 2.7.5 版本開始支持 gRPC 協(xié)議,對(duì)于計(jì)劃使用 HTTP/2 通信,或者想利用 gRPC 帶來的 Stream、反壓、Reactive 編程等能力的開發(fā)者來說, 都可以考慮啟用 gRPC 協(xié)議。
- 為期望使用 gRPC 協(xié)議的用戶帶來服務(wù)治理能力,方便接入 Dubbo 體系
- 用戶可以使用 Dubbo 風(fēng)格的,基于接口的編程風(fēng)格來定義和使用遠(yuǎn)程服務(wù)
memcached協(xié)議
基于 memcached實(shí)現(xiàn)的 RPC 協(xié)議
redis協(xié)議
基于 Redis 實(shí)現(xiàn)的 RPC 協(xié)議
序列化
序列化就是將對(duì)象轉(zhuǎn)成字節(jié)流,用于網(wǎng)絡(luò)傳輸,以及將字節(jié)流轉(zhuǎn)為對(duì)象,用于在收到字節(jié)流數(shù)據(jù)后還原成對(duì)象。序列化的優(yōu)勢(shì)有很多,例如安全性更好、可跨平臺(tái)等。我們知道dubbo基于netty進(jìn)行網(wǎng)絡(luò)通訊,在`NettyClient.doOpen()`方法中可以看到Netty的相關(guān)類
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});然后去看NettyCodecAdapter 類最后進(jìn)入ExchangeCodec類的encodeRequest方法,如下:
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.
byte[] header = new byte[HEADER_LENGTH];是的,就是Serialization接口,默認(rèn)是Hessian2Serialization序列化接口。

Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,protostuff其中默認(rèn)hessian2。其中java、compactedjava、nativejava屬于原生java的序列化。
- dubbo序列化:阿里尚未開發(fā)成熟的高效java序列化實(shí)現(xiàn),阿里不建議在生產(chǎn)環(huán)境使用它。
- **hessian2序列化:hessian是一種跨語言的高效二進(jìn)制序列化方式。但這里實(shí)際不是原生的hessian2序列化,而是阿里修改過的,它是dubbo RPC默認(rèn)啟用的序列化方式。**
- json序列化:目前有兩種實(shí)現(xiàn),一種是采用的阿里的fastjson庫,另一種是采用dubbo中自己實(shí)現(xiàn)的簡(jiǎn)單json庫,但其實(shí)現(xiàn)都不是特別成熟,而且json這種文本序列化性能一般不如上面兩種二進(jìn)制序列化。
- java序列化:主要是采用JDK自帶的Java序列化實(shí)現(xiàn),性能很不理想。
最近幾年,各種新的高效序列化方式層出不窮,不斷刷新序列化性能的上限,最典型的包括:
- 專門針對(duì)Java語言的:Kryo,F(xiàn)ST等等
- 跨語言的:Protostuff,ProtoBuf,Thrift,Avro,MsgPack等等
這些序列化方式的性能多數(shù)都顯著優(yōu)于 hessian2 (甚至包括尚未成熟的dubbo序列化)。所以我們可以為 dubbo 引入 Kryo 和 FST 這兩種高效 Java 來優(yōu)化 dubbo 的序列化。
使用Kryo和FST非常簡(jiǎn)單,只需要在dubbo RPC的XML配置中添加一個(gè)屬性即可:
<dubbo:protocol name="dubbo" serialization="kryo"/>
網(wǎng)絡(luò)通信
dubbo中數(shù)據(jù)格式
解決socket中數(shù)據(jù)粘包拆包問題,一般有三種方式
* 定長(zhǎng)協(xié)議(數(shù)據(jù)包長(zhǎng)度一致)
* 定長(zhǎng)的協(xié)議是指協(xié)議內(nèi)容的長(zhǎng)度是固定的,比如協(xié)議byte長(zhǎng)度是50,當(dāng)從網(wǎng)絡(luò)上讀取50個(gè)byte后,就進(jìn)行decode解碼操作。定長(zhǎng)協(xié)議在讀取或者寫入時(shí),效率比較高,因?yàn)閿?shù)據(jù)緩存的大小基本都確定了,就好比數(shù)組一樣,缺陷就是適應(yīng)性不足,以RPC場(chǎng)景為例,很難估計(jì)出定長(zhǎng)的長(zhǎng)度是多少。
* 特殊結(jié)束符(數(shù)據(jù)尾:通過特殊的字符標(biāo)識(shí)#)
* 相比定長(zhǎng)協(xié)議,如果能夠定義一個(gè)特殊字符作為每個(gè)協(xié)議單元結(jié)束的標(biāo)示,就能夠以變長(zhǎng)的方式進(jìn)行通信,從而在數(shù)據(jù)傳輸和高效之間取得平衡,比如用特殊字符`\n`。特殊結(jié)束符方式的問題是過于簡(jiǎn)單的思考了協(xié)議傳輸?shù)倪^程,對(duì)于一個(gè)協(xié)議單元必須要全部讀入才能夠進(jìn)行處理,除此之外必須要防止用戶傳輸?shù)臄?shù)據(jù)不能同結(jié)束符相同,否則就會(huì)出現(xiàn)紊亂。
* 變長(zhǎng)協(xié)議(協(xié)議頭+payload模式)
* 這種一般是自定義協(xié)議,會(huì)以定長(zhǎng)加不定長(zhǎng)的部分組成,其中定長(zhǎng)的部分需要描述不定長(zhǎng)的內(nèi)容長(zhǎng)度。
* dubbo就是使用這種形式的數(shù)據(jù)傳輸格式
Dubbo 框架定義了私有的RPC協(xié)議,其中請(qǐng)求和響應(yīng)協(xié)議的具體內(nèi)容我們使用表格來展示。

Dubbo 數(shù)據(jù)包分為消息頭和消息體,消息頭用于存儲(chǔ)一些元信息,比如魔數(shù)(Magic),數(shù)據(jù)包類型(Request/Response),消息體長(zhǎng)度(Data Length)等。消息體中用于存儲(chǔ)具體的調(diào)用消息,比如方法名稱,參數(shù)列表等。下面簡(jiǎn)單列舉一下消息頭的內(nèi)容。
| 偏移量(Bit) | 字段 | 取值 |
| ----------- | ------------ | ------------------------------------------------------------ |
| 0 ~ 7 | 魔數(shù)高位 | 0xda00 |
| 8 ~ 15 | 魔數(shù)低位 | 0xbb |
| 16 | 數(shù)據(jù)包類型 | 0 - Response, 1 - Request |
| 17 | 調(diào)用方式 | 僅在第16位被設(shè)為1的情況下有效,0 - 單向調(diào)用,1 - 雙向調(diào)用 |
| 18 | 事件標(biāo)識(shí) | 0 - 當(dāng)前數(shù)據(jù)包是請(qǐng)求或響應(yīng)包,1 - 當(dāng)前數(shù)據(jù)包是心跳包 |
| 19 ~ 23 | 序列化器編號(hào) | 2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization |
| 24 ~ 31 | 狀態(tài) | 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ...... |
| 32 ~ 95 | 請(qǐng)求編號(hào) | 共8字節(jié),運(yùn)行時(shí)生成 |
| 96 ~ 127 | 消息體長(zhǎng)度 | 運(yùn)行時(shí)計(jì)算
消費(fèi)方發(fā)送請(qǐng)求
(1)發(fā)送請(qǐng)求
為了便于大家閱讀代碼,這里以 DemoService 為例,將 sayHello 方法的整個(gè)調(diào)用路徑貼出來。
proxy0#sayHello(String) —> InvokerInvocationHandler#invoke(Object, Method, Object[]) —> MockClusterInvoker#invoke(Invocation) —> AbstractClusterInvoker#invoke(Invocation) —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) —> Filter#invoke(Invoker, Invocation) // 包含多個(gè) Filter 調(diào)用 —> ListenerInvokerWrapper#invoke(Invocation) —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation) —> ReferenceCountExchangeClient#request(Object, int) —> HeaderExchangeClient#request(Object, int) —> HeaderExchangeChannel#request(Object, int) —> AbstractPeer#send(Object) —> AbstractClient#send(Object, boolean) —> NettyChannel#send(Object, boolean) —> NioClientSocketChannel#write(Object)
dubbo消費(fèi)方,自動(dòng)生成代碼對(duì)象如下
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
private InvocationHandler handler;
public String sayHello(String string) {
// 將參數(shù)存儲(chǔ)到 Object 數(shù)組中
Object[] arrobject = new Object[]{string};
// 調(diào)用 InvocationHandler 實(shí)現(xiàn)類的 invoke 方法得到調(diào)用結(jié)果
Object object = this.handler.invoke(this, methods[0], arrobject);
// 返回調(diào)用結(jié)果
return (String)object;
}
}InvokerInvocationHandler 中的 invoker 成員變量類型為 MockClusterInvoker,MockClusterInvoker 內(nèi)部封裝了服務(wù)降級(jí)邏輯。下面簡(jiǎn)單看一下:
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 獲取 mock 配置值
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
// 無 mock 邏輯,直接調(diào)用其他 Invoker 對(duì)象的 invoke 方法,
// 比如 FailoverClusterInvoker
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
// force:xxx 直接執(zhí)行 mock 邏輯,不發(fā)起遠(yuǎn)程調(diào)用
result = doMockInvoke(invocation, null);
} else {
// fail:xxx 表示消費(fèi)方對(duì)調(diào)用服務(wù)失敗后,再執(zhí)行 mock 邏輯,不拋出異常
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
// 調(diào)用失敗,執(zhí)行 mock 邏輯
result = doMockInvoke(invocation, e);
}
}
return result;
}考慮到前文已經(jīng)詳細(xì)分析過 FailoverClusterInvoker,因此本節(jié)略過 FailoverClusterInvoker,直接分析 DubboInvoker。
public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service ...");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 設(shè)置 Invoker
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
// 設(shè)置 attachment
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
// 添加 contextAttachments 到 RpcInvocation#attachment 變量中
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
// 設(shè)置異步信息到 RpcInvocation#attachment 中
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 抽象方法,由子類實(shí)現(xiàn)
return doInvoke(invocation);
} catch (InvocationTargetException e) {
// ...
} catch (RpcException e) {
// ...
} catch (Throwable e) {
return new RpcResult(e);
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
// 省略其他方法
}上面的代碼來自 AbstractInvoker 類,其中大部分代碼用于添加信息到 RpcInvocation#attachment 變量中,添加完畢后,調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用。doInvoke 是一個(gè)抽象方法,需要由子類實(shí)現(xiàn),下面到 DubboInvoker 中看一下。
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
//將目標(biāo)方法以及版本好作為參數(shù)放入到Invocation中
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
//獲得客戶端連接
ExchangeClient currentClient; //初始化invoker的時(shí)候,構(gòu)建的一個(gè)遠(yuǎn)程通信連接
if (clients.length == 1) { //默認(rèn)
currentClient = clients[0];
} else {
//通過取模獲得其中一個(gè)連接
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//表示當(dāng)前的方法是否存在返回值
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
//isOneway 為 true,表示“單向”通信
if (isOneway) {//異步無返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else { //存在返回值
//是否采用異步
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
responseFuture.whenComplete((obj, t) -> {
if (t != null) {
asyncRpcResult.completeExceptionally(t);
} else {
asyncRpcResult.complete((AppResponse) obj);
}
});
RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
}
//省略無關(guān)代碼
}最終進(jìn)入到HeaderExchangeChannel#request方法,拼裝Request并將請(qǐng)求發(fā)送出去
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed tosend request " + request + ", cause: The channel " + this + " is closed!");
}
// 創(chuàng)建請(qǐng)求對(duì)象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
//NettyClient
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
} (2)請(qǐng)求編碼
在netty啟動(dòng)時(shí),我們?cè)O(shè)置了編解碼器,其中通過ExchangeCodec完成編解碼工作如下:
public class ExchangeCodec extends TelnetCodec {
// 消息頭長(zhǎng)度
protected static final int HEADER_LENGTH = 16;
// 魔數(shù)內(nèi)容
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
public Short getMagicCode() {
return MAGIC;
}
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
// 對(duì) Request 對(duì)象進(jìn)行編碼
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 對(duì) Response 對(duì)象進(jìn)行編碼,后面分析
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// 創(chuàng)建消息頭字節(jié)數(shù)組,長(zhǎng)度為 16
byte[] header = new byte[HEADER_LENGTH];
// 設(shè)置魔數(shù)
Bytes.short2bytes(MAGIC, header);
// 設(shè)置數(shù)據(jù)包類型(Request/Response)和序列化器編號(hào)
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
// 設(shè)置通信方式(單向/雙向)
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
// 設(shè)置事件標(biāo)識(shí)
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// 設(shè)置請(qǐng)求編號(hào),8個(gè)字節(jié),從第4個(gè)字節(jié)開始設(shè)置
Bytes.long2bytes(req.getId(), header, 4);
// 獲取 buffer 當(dāng)前的寫位置
int savedWriteIndex = buffer.writerIndex();
// 更新 writerIndex,為消息頭預(yù)留 16 個(gè)字節(jié)的空間
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 創(chuàng)建序列化器,比如 Hessian2ObjectOutput
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 對(duì)事件數(shù)據(jù)進(jìn)行序列化操作
encodeEventData(channel, out, req.getData());
} else {
// 對(duì)請(qǐng)求數(shù)據(jù)進(jìn)行序列化操作
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 獲取寫入的字節(jié)數(shù),也就是消息體長(zhǎng)度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 將消息體長(zhǎng)度寫入到消息頭中
Bytes.int2bytes(len, header, 12);
// 將 buffer 指針移動(dòng)到 savedWriteIndex,為寫消息頭做準(zhǔn)備
buffer.writerIndex(savedWriteIndex);
// 從 savedWriteIndex 下標(biāo)處寫入消息頭
buffer.writeBytes(header);
// 設(shè)置新的 writerIndex,writerIndex = 原寫下標(biāo) + 消息頭長(zhǎng)度 + 消息體長(zhǎng)度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// 省略其他方法
}以上就是請(qǐng)求對(duì)象的編碼過程,該過程首先會(huì)通過位運(yùn)算將消息頭寫入到 header 數(shù)組中。然后對(duì) Request 對(duì)象的 data 字段執(zhí)行序列化操作,序列化后的數(shù)據(jù)最終會(huì)存儲(chǔ)到 ChannelBuffer 中。序列化操作執(zhí)行完后,可得到數(shù)據(jù)序列化后的長(zhǎng)度 len,緊接著將 len 寫入到 header 指定位置處。最后再將消息頭字節(jié)數(shù)組 header 寫入到 ChannelBuffer 中,整個(gè)編碼過程就結(jié)束了。本節(jié)的最后,我們?cè)賮砜匆幌?Request 對(duì)象的 data 字段序列化過程,也就是 encodeRequestData 方法的邏輯,如下:
public class DubboCodec extends ExchangeCodec implements Codec2 {
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
// 依次序列化 dubbo version、path、version
out.writeUTF(version);
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
// 序列化調(diào)用方法名
out.writeUTF(inv.getMethodName());
// 將參數(shù)類型轉(zhuǎn)換為字符串,并進(jìn)行序列化
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++) {
// 對(duì)運(yùn)行時(shí)參數(shù)進(jìn)行序列化
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
// 序列化 attachments
out.writeObject(inv.getAttachments());
}
} 至此,關(guān)于服務(wù)消費(fèi)方發(fā)送請(qǐng)求的過程就分析完了,接下來我們來看一下服務(wù)提供方是如何接收請(qǐng)求的。
提供方接收請(qǐng)求
(1) 請(qǐng)求解碼
這里直接分析請(qǐng)求數(shù)據(jù)的解碼邏輯,忽略中間過程,如下:
public class ExchangeCodec extends TelnetCodec {
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
// 創(chuàng)建消息頭字節(jié)數(shù)組
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
// 讀取消息頭數(shù)據(jù)
buffer.readBytes(header);
// 調(diào)用重載方法進(jìn)行后續(xù)解碼工作
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 檢查魔數(shù)是否相等
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 通過 telnet 命令行發(fā)送的數(shù)據(jù)包不包含消息頭,所以這里
// 調(diào)用 TelnetCodec 的 decode 方法對(duì)數(shù)據(jù)包進(jìn)行解碼
return super.decode(channel, buffer, readable, header);
}
// 檢測(cè)可讀數(shù)據(jù)量是否少于消息頭長(zhǎng)度,若小于則立即返回 DecodeResult.NEED_MORE_INPUT
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// 從消息頭中獲取消息體長(zhǎng)度
int len = Bytes.bytes2int(header, 12);
// 檢測(cè)消息體長(zhǎng)度是否超出限制,超出則拋出異常
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
// 檢測(cè)可讀的字節(jié)數(shù)是否小于實(shí)際的字節(jié)數(shù)
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
// 繼續(xù)進(jìn)行解碼工作
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}上面方法通過檢測(cè)消息頭中的魔數(shù)是否與規(guī)定的魔數(shù)相等,提前攔截掉非常規(guī)數(shù)據(jù)包,比如通過 telnet 命令行發(fā)出的數(shù)據(jù)包。接著再對(duì)消息體長(zhǎng)度,以及可讀字節(jié)數(shù)進(jìn)行檢測(cè)。最后調(diào)用 decodeBody 方法進(jìn)行后續(xù)的解碼工作,ExchangeCodec 中實(shí)現(xiàn)了 decodeBody 方法,但因其子類 DubboCodec 覆寫了該方法,所以在運(yùn)行時(shí) DubboCodec 中的 decodeBody 方法會(huì)被調(diào)用。下面我們來看一下該方法的代碼。
public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 獲取消息頭中的第三個(gè)字節(jié),并通過邏輯與運(yùn)算得到序列化器編號(hào)
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 獲取調(diào)用編號(hào)
long id = Bytes.bytes2long(header, 4);
// 通過邏輯與運(yùn)算得到調(diào)用類型,0 - Response,1 - Request
if ((flag & FLAG_REQUEST) == 0) {
// 對(duì)響應(yīng)結(jié)果進(jìn)行解碼,得到 Response 對(duì)象。這個(gè)非本節(jié)內(nèi)容,后面再分析
// ...
} else {
// 創(chuàng)建 Request 對(duì)象
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
// 通過邏輯與運(yùn)算得到通信方式,并設(shè)置到 Request 對(duì)象中
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// 通過位運(yùn)算檢測(cè)數(shù)據(jù)包是否為事件類型
if ((flag & FLAG_EVENT) != 0) {
// 設(shè)置心跳事件到 Request 對(duì)象中
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {
// 對(duì)心跳包進(jìn)行解碼,該方法已被標(biāo)注為廢棄
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) {
// 對(duì)事件數(shù)據(jù)進(jìn)行解碼
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
// 根據(jù) url 參數(shù)判斷是否在 IO 線程上對(duì)消息體進(jìn)行解碼
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
// 在當(dāng)前線程,也就是 IO 線程上進(jìn)行后續(xù)的解碼工作。此工作完成后,可將
// 調(diào)用方法名、attachment、以及調(diào)用參數(shù)解析出來
inv.decode();
} else {
// 僅創(chuàng)建 DecodeableRpcInvocation 對(duì)象,但不在當(dāng)前線程上執(zhí)行解碼邏輯
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
// 設(shè)置 data 到 Request 對(duì)象中
req.setData(data);
} catch (Throwable t) {
// 若解碼過程中出現(xiàn)異常,則將 broken 字段設(shè)為 true,
// 并將異常對(duì)象設(shè)置到 Reqeust 對(duì)象中
req.setBroken(true);
req.setData(t);
}
return req;
}
}
} 如上,decodeBody 對(duì)部分字段進(jìn)行了解碼,并將解碼得到的字段封裝到 Request 中。隨后會(huì)調(diào)用 DecodeableRpcInvocation 的 decode 方法進(jìn)行后續(xù)的解碼工作。此工作完成后,可將調(diào)用方法名、attachment、以及調(diào)用參數(shù)解析出來。
(2)調(diào)用服務(wù)
解碼器將數(shù)據(jù)包解析成 Request 對(duì)象后,NettyHandler 的 messageReceived 方法緊接著會(huì)收到這個(gè)對(duì)象,并將這個(gè)對(duì)象繼續(xù)向下傳遞。整個(gè)調(diào)用棧如下:
NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent) —> AbstractPeer#received(Channel, Object) —> MultiMessageHandler#received(Channel, Object) —> HeartbeatHandler#received(Channel, Object) —> AllChannelHandler#received(Channel, Object) —> ExecutorService#execute(Runnable) // 由線程池執(zhí)行后續(xù)的調(diào)用邏輯
這里我們直接分析調(diào)用棧中的分析第一個(gè)和最后一個(gè)調(diào)用方法邏輯。如下:
考慮到篇幅,以及很多中間調(diào)用的邏輯并非十分重要,所以這里就不對(duì)調(diào)用棧中的每個(gè)方法都進(jìn)行分析了。這里我們直接分析最后一個(gè)調(diào)用方法邏輯。如下:
public class ChannelEventRunnable implements Runnable {
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
@Override
public void run() {
// 檢測(cè)通道狀態(tài),對(duì)于請(qǐng)求或響應(yīng)消息,此時(shí) state = RECEIVED
if (state == ChannelState.RECEIVED) {
try {
// 將 channel 和 message 傳給 ChannelHandler 對(duì)象,進(jìn)行后續(xù)的調(diào)用
handler.received(channel, message);
} catch (Exception e) {
logger.warn("... operation error, channel is ... message is ...");
}
}
// 其他消息類型通過 switch 進(jìn)行處理
else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("... operation error, channel is ...");
}
break;
case DISCONNECTED:
// ...
case SENT:
// ...
case CAUGHT:
// ...
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
}如上,請(qǐng)求和響應(yīng)消息出現(xiàn)頻率明顯比其他類型消息高,所以這里對(duì)該類型的消息進(jìn)行了針對(duì)性判斷。ChannelEventRunnable 僅是一個(gè)中轉(zhuǎn)站,它的 run 方法中并不包含具體的調(diào)用邏輯,僅用于將參數(shù)傳給其他 ChannelHandler 對(duì)象進(jìn)行處理,該對(duì)象類型為 DecodeHandler。
public class DecodeHandler extends AbstractChannelHandlerDelegate {
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
// 對(duì) Decodeable 接口實(shí)現(xiàn)類對(duì)象進(jìn)行解碼
decode(message);
}
if (message instanceof Request) {
// 對(duì) Request 的 data 字段進(jìn)行解碼
decode(((Request) message).getData());
}
if (message instanceof Response) {
// 對(duì) Request 的 result 字段進(jìn)行解碼
decode(((Response) message).getResult());
}
// 執(zhí)行后續(xù)邏輯
handler.received(channel, message);
}
private void decode(Object message) {
// Decodeable 接口目前有兩個(gè)實(shí)現(xiàn)類,
// 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
if (message != null && message instanceof Decodeable) {
try {
// 執(zhí)行解碼邏輯
((Decodeable) message).decode();
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
}
}
}
}DecodeHandler 主要是包含了一些解碼邏輯,完全解碼后的 Request 對(duì)象會(huì)繼續(xù)向后傳遞
public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 獲取 Invoker 實(shí)例
Invoker<?> invoker = getInvoker(channel, inv);
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
// 回調(diào)相關(guān),忽略
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 通過 Invoker 調(diào)用具體的服務(wù)
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: ...");
}
// 忽略其他方法
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
// 忽略回調(diào)和本地存根相關(guān)邏輯
// ...
int port = channel.getLocalAddress().getPort();
// 計(jì)算 service key,格式為 groupName/serviceName:serviceVersion:port。比如:
// dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
// 從 exporterMap 查找與 serviceKey 相對(duì)應(yīng)的 DubboExporter 對(duì)象,
// 服務(wù)導(dǎo)出過程中會(huì)將 <serviceKey, DubboExporter> 映射關(guān)系存儲(chǔ)到 exporterMap 集合中
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service ...");
// 獲取 Invoker 對(duì)象,并返回
return exporter.getInvoker();
}
// 忽略其他方法
}在之前課程中介紹過,服務(wù)全部暴露完成之后保存到exporterMap中。這里就是通過serviceKey獲取exporter之后獲取Invoker,并通過 Invoker 的 invoke 方法調(diào)用服務(wù)邏輯
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
// 調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用,并將調(diào)用結(jié)果封裝到 RpcResult 中,并
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method ...");
}
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}如上,doInvoke 是一個(gè)抽象方法,這個(gè)需要由具體的 Invoker 實(shí)例實(shí)現(xiàn)。Invoker 實(shí)例是在運(yùn)行時(shí)通過 JavassistProxyFactory 創(chuàng)建的,創(chuàng)建邏輯如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
// 省略其他方法
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 創(chuàng)建匿名類對(duì)象
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 調(diào)用 invokeMethod 方法進(jìn)行后續(xù)的調(diào)用
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}Wrapper 是一個(gè)抽象類,其中 invokeMethod 是一個(gè)抽象方法。Dubbo 會(huì)在運(yùn)行時(shí)通過 Javassist 框架為 Wrapper 生成實(shí)現(xiàn)類,并實(shí)現(xiàn) invokeMethod 方法,該方法最終會(huì)根據(jù)調(diào)用信息調(diào)用具體的服務(wù)。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。
/** Wrapper0 是在運(yùn)行時(shí)生成的,大家可使用 Arthas 進(jìn)行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
// 省略其他方法
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoService demoService;
try {
// 類型轉(zhuǎn)換
demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
// 根據(jù)方法名調(diào)用指定的方法
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoService.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
}
}到這里,整個(gè)服務(wù)調(diào)用過程就分析完了。最后把調(diào)用過程貼出來,如下:
ChannelEventRunnable#run() —> DecodeHandler#received(Channel, Object) —> HeaderExchangeHandler#received(Channel, Object) —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request) —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object) —> Filter#invoke(Invoker, Invocation) —> AbstractProxyInvoker#invoke(Invocation) —> Wrapper0#invokeMethod(Object, String, Class[], Object[]) —> DemoServiceImpl#sayHello(String)
提供方返回調(diào)用結(jié)果
服務(wù)提供方調(diào)用指定服務(wù)后,會(huì)將調(diào)用結(jié)果封裝到 Response 對(duì)象中,并將該對(duì)象返回給服務(wù)消費(fèi)方。服務(wù)提供方也是通過 NettyChannel 的 send 方法將 Response 對(duì)象返回,這里就不在重復(fù)分析了。本節(jié)我們僅需關(guān)注 Response 對(duì)象的編碼過程即可。
public class ExchangeCodec extends TelnetCodec {
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 對(duì)響應(yīng)對(duì)象進(jìn)行編碼
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
Serialization serialization = getSerialization(channel);
// 創(chuàng)建消息頭字節(jié)數(shù)組
byte[] header = new byte[HEADER_LENGTH];
// 設(shè)置魔數(shù)
Bytes.short2bytes(MAGIC, header);
// 設(shè)置序列化器編號(hào)
header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
// 獲取響應(yīng)狀態(tài)
byte status = res.getStatus();
// 設(shè)置響應(yīng)狀態(tài)
header[3] = status;
// 設(shè)置請(qǐng)求編號(hào)
Bytes.long2bytes(res.getId(), header, 4);
// 更新 writerIndex,為消息頭預(yù)留 16 個(gè)字節(jié)的空間
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (status == Response.OK) {
if (res.isHeartbeat()) {
// 對(duì)心跳響應(yīng)結(jié)果進(jìn)行序列化,已廢棄
encodeHeartbeatData(channel, out, res.getResult());
} else {
// 對(duì)調(diào)用結(jié)果進(jìn)行序列化
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else {
// 對(duì)錯(cuò)誤信息進(jìn)行序列化
out.writeUTF(res.getErrorMessage())
};
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 獲取寫入的字節(jié)數(shù),也就是消息體長(zhǎng)度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 將消息體長(zhǎng)度寫入到消息頭中
Bytes.int2bytes(len, header, 12);
// 將 buffer 指針移動(dòng)到 savedWriteIndex,為寫消息頭做準(zhǔn)備
buffer.writerIndex(savedWriteIndex);
// 從 savedWriteIndex 下標(biāo)處寫入消息頭
buffer.writeBytes(header);
// 設(shè)置新的 writerIndex,writerIndex = 原寫下標(biāo) + 消息頭長(zhǎng)度 + 消息體長(zhǎng)度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// 異常處理邏輯不是很難理解,但是代碼略多,這里忽略了
}
}
}
public class DubboCodec extends ExchangeCodec implements Codec2 {
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
Result result = (Result) data;
// 檢測(cè)當(dāng)前協(xié)議版本是否支持帶有 attachment 集合的 Response 對(duì)象
boolean attach = Version.isSupportResponseAttachment(version);
Throwable th = result.getException();
// 異常信息為空
if (th == null) {
Object ret = result.getValue();
// 調(diào)用結(jié)果為空
if (ret == null) {
// 序列化響應(yīng)類型
out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
}
// 調(diào)用結(jié)果非空
else {
// 序列化響應(yīng)類型
out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
// 序列化調(diào)用結(jié)果
out.writeObject(ret);
}
}
// 異常信息非空
else {
// 序列化響應(yīng)類型
out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
// 序列化異常對(duì)象
out.writeObject(th);
}
if (attach) {
// 記錄 Dubbo 協(xié)議版本
result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
// 序列化 attachments 集合
out.writeObject(result.getAttachments());
}
}
} 以上就是 Response 對(duì)象編碼的過程,和前面分析的 Request 對(duì)象編碼過程很相似。如果大家能看 Request 對(duì)象的編碼邏輯,那么這里的 Response 對(duì)象的編碼邏輯也不難理解,就不多說了。接下來我們?cè)賮矸治鲭p向通信的最后一環(huán) —— 服務(wù)消費(fèi)方接收調(diào)用結(jié)果。
消費(fèi)方接收調(diào)用結(jié)果
服務(wù)消費(fèi)方在收到響應(yīng)數(shù)據(jù)后,首先要做的事情是對(duì)響應(yīng)數(shù)據(jù)進(jìn)行解碼,得到 Response 對(duì)象。然后再將該對(duì)象傳遞給下一個(gè)入站處理器,這個(gè)入站處理器就是 NettyHandler。接下來 NettyHandler 會(huì)將這個(gè)對(duì)象繼續(xù)向下傳遞,最后 AllChannelHandler 的 received 方法會(huì)收到這個(gè)對(duì)象,并將這個(gè)對(duì)象派發(fā)到線程池中。這個(gè)過程和服務(wù)提供方接收請(qǐng)求的過程是一樣的,因此這里就不重復(fù)分析了。
(1)響應(yīng)數(shù)據(jù)解碼
響應(yīng)數(shù)據(jù)解碼邏輯主要的邏輯封裝在 DubboCodec 中,我們直接分析這個(gè)類的代碼。如下:
public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 獲取請(qǐng)求編號(hào)
long id = Bytes.bytes2long(header, 4);
// 檢測(cè)消息類型,若下面的條件成立,表明消息類型為 Response
if ((flag & FLAG_REQUEST) == 0) {
// 創(chuàng)建 Response 對(duì)象
Response res = new Response(id);
// 檢測(cè)事件標(biāo)志位
if ((flag & FLAG_EVENT) != 0) {
// 設(shè)置心跳事件
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 獲取響應(yīng)狀態(tài)
byte status = header[3];
// 設(shè)置響應(yīng)狀態(tài)
res.setStatus(status);
// 如果響應(yīng)狀態(tài)為 OK,表明調(diào)用過程正常
if (status == Response.OK) {
try {
Object data;
if (res.isHeartbeat()) {
// 反序列化心跳數(shù)據(jù),已廢棄
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {
// 反序列化事件數(shù)據(jù)
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcResult result;
// 根據(jù) url 參數(shù)決定是否在 IO 線程上執(zhí)行解碼邏輯
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
// 創(chuàng)建 DecodeableRpcResult 對(duì)象
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
// 進(jìn)行后續(xù)的解碼工作
result.decode();
} else {
// 創(chuàng)建 DecodeableRpcResult 對(duì)象
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
// 設(shè)置 DecodeableRpcResult 對(duì)象到 Response 對(duì)象中
res.setResult(data);
} catch (Throwable t) {
// 解碼過程中出現(xiàn)了錯(cuò)誤,此時(shí)設(shè)置 CLIENT_ERROR 狀態(tài)碼到 Response 對(duì)象中
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
}
// 響應(yīng)狀態(tài)非 OK,表明調(diào)用過程出現(xiàn)了異常
else {
// 反序列化異常信息,并設(shè)置到 Response 對(duì)象中
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {
// 對(duì)請(qǐng)求數(shù)據(jù)進(jìn)行解碼,前面已分析過,此處忽略
}
}
}以上就是響應(yīng)數(shù)據(jù)的解碼過程,上面邏輯看起來是不是似曾相識(shí)。對(duì)的,我們?cè)谇懊嬲鹿?jié)分析過 DubboCodec 的 decodeBody 方法中關(guān)于請(qǐng)求數(shù)據(jù)的解碼過程,該過程和響應(yīng)數(shù)據(jù)的解碼過程很相似。下面,我們繼續(xù)分析調(diào)用結(jié)果的反序列化過程
public class DecodeableRpcResult extends AppResponse implements Codec, Decodeable {
private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class);
private Channel channel;
private byte serializationType;
private InputStream inputStream;
private Response response;
private Invocation invocation;
private volatile boolean hasDecoded;
public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) {
Assert.notNull(channel, "channel == null");
Assert.notNull(response, "response == null");
Assert.notNull(is, "inputStream == null");
this.channel = channel;
this.response = response;
this.inputStream = is;
this.invocation = invocation;
this.serializationType = id;
}
@Override
public void encode(Channel channel, OutputStream output, Object message) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 反序列化響應(yīng)類型
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
break;
case DubboCodec.RESPONSE_VALUE:
handleValue(in);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION:
handleException(in);
break;
// 返回值為空,且攜帶了 attachments 集合
case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
handleAttachment(in);
break;
//返回值不為空,且攜帶了 attachments 集合
case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
handleValue(in);
handleAttachment(in);
break;
// 異常對(duì)象不為空,且攜帶了 attachments 集合
case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
handleException(in);
handleAttachment(in);
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag);
}
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
return this;
} 正常調(diào)用下,線程會(huì)進(jìn)入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。然后線程會(huì)從 invocation 變量(大家探索一下 invocation 變量的由來)中獲取返回值類型,接著對(duì)調(diào)用結(jié)果進(jìn)行反序列化,并將序列化后的結(jié)果存儲(chǔ)起來。最后對(duì) attachments 集合進(jìn)行反序列化,并存到指定字段中。
異步轉(zhuǎn)同步
Dubbo發(fā)送數(shù)據(jù)至服務(wù)方后,在通信層面是異步的,通信線程并不會(huì)等待結(jié)果數(shù)據(jù)返回。而我們?cè)谑褂肈ubbo進(jìn)行RPC調(diào)用缺省就是同步的,這其中就涉及到了異步轉(zhuǎn)同步的操作。
而在2.7.x版本中,這種自實(shí)現(xiàn)的異步轉(zhuǎn)同步操作進(jìn)行了修改。新的`DefaultFuture`繼承了`CompletableFuture`,新的`doReceived(Response res)`方法如下:
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}通過`CompletableFuture#complete`方法來設(shè)置異步的返回結(jié)果,且刪除舊的`get()`方法,使用`CompletableFuture#get()`方法:
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
} 使用`CompletableFuture`完成了異步轉(zhuǎn)同步的操作。
異步多線程數(shù)據(jù)一致
這里簡(jiǎn)單說明一下。一般情況下,服務(wù)消費(fèi)方會(huì)并發(fā)調(diào)用多個(gè)服務(wù),每個(gè)用戶線程發(fā)送請(qǐng)求后,會(huì)調(diào)用 get 方法進(jìn)行等待。 一段時(shí)間后,服務(wù)消費(fèi)方的線程池會(huì)收到多個(gè)響應(yīng)對(duì)象。這個(gè)時(shí)候要考慮一個(gè)問題,如何將每個(gè)響應(yīng)對(duì)象傳遞給相應(yīng)的 Future 對(duì)象,不出錯(cuò)。答案是通過調(diào)用**編號(hào)**。Future 被創(chuàng)建時(shí),會(huì)要求傳入一個(gè) Request 對(duì)象。此時(shí) DefaultFuture 可從 Request 對(duì)象中獲取調(diào)用編號(hào),并將 <調(diào)用編號(hào), DefaultFuture 對(duì)象> 映射關(guān)系存入到靜態(tài) Map 中,即 FUTURES。線程池中的線程在收到 Response 對(duì)象后,會(huì)根據(jù) Response 對(duì)象中的調(diào)用編號(hào)到 FUTURES 集合中取出相應(yīng)的 DefaultFuture 對(duì)象,然后再將 Response 對(duì)象設(shè)置到 DefaultFuture 對(duì)象中。這樣用戶線程即可從 DefaultFuture 對(duì)象中獲取調(diào)用結(jié)果了。整個(gè)過程大致如下圖:

private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
} 心跳檢查
Dubbo采用雙向心跳的方式檢測(cè)Client端與Server端的連通性。
我們?cè)賮砜纯?Dubbo 是如何設(shè)計(jì)應(yīng)用層心跳的。Dubbo 的心跳是雙向心跳,客戶端會(huì)給服務(wù)端發(fā)送心跳,反之,服務(wù)端也會(huì)向客戶端發(fā)送心跳。
創(chuàng)建定時(shí)器
public class HeaderExchangeClient implements ExchangeClient {
private final Client client;
private final ExchangeChannel channel;
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
private HeartbeatTimerTask heartBeatTimerTask;
private ReconnectTimerTask reconnectTimerTask;
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);
if (startTimer) {
URL url = client.getUrl();
//開啟心跳失敗之后處理重連,斷連的邏輯定時(shí)任務(wù)
startReconnectTask(url);
//開啟發(fā)送心跳請(qǐng)求定時(shí)任務(wù)
startHeartBeatTask(url);
}
} Dubbo 在 `HeaderExchangeClient `初始化時(shí)開啟了兩個(gè)定時(shí)任務(wù)
`startReconnectTask` 主要用于定時(shí)發(fā)送心跳請(qǐng)求
`startHeartBeatTask` 主要用于心跳失敗之后處理重連,斷連的邏輯
發(fā)送心跳請(qǐng)求
詳細(xì)解析下心跳檢測(cè)定時(shí)任務(wù)的邏輯 `HeartbeatTimerTask#doTask`:
protected void doTask(Channel channel) {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
if ((lastRead != null && now() - lastRead > heartbeat)
|| (lastWrite != null && now() - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
}
} 前面已經(jīng)介紹過,**Dubbo 采取的是雙向心跳設(shè)計(jì)**,即服務(wù)端會(huì)向客戶端發(fā)送心跳,客戶端也會(huì)向服務(wù)端發(fā)送心跳,接收的一方更新 lastRead 字段,發(fā)送的一方更新 lastWrite 字段,超過心跳間隙的時(shí)間,便發(fā)送心跳請(qǐng)求給對(duì)端。這里的 lastRead/lastWrite 同樣會(huì)被同一個(gè)通道上的普通調(diào)用更新,通過更新這兩個(gè)字段,實(shí)現(xiàn)了只在連接空閑時(shí)才會(huì)真正發(fā)送空閑報(bào)文的機(jī)制,符合我們一開始科普的做法。
處理重連和斷連
繼續(xù)研究下重連和斷連定時(shí)器都實(shí)現(xiàn)了什么 `ReconnectTimerTask#doTask`。
protected void doTask(Channel channel) {
Long lastRead = lastRead(channel);
Long now = now();
if (!channel.isConnected()) {
((Client) channel).reconnect();
// check pong at client
} else if (lastRead != null && now - lastRead > idleTimeout) {
((Client) channel).reconnect();
}
} 第二個(gè)定時(shí)器則負(fù)責(zé)根據(jù)客戶端、服務(wù)端類型來對(duì)連接做不同的處理,當(dāng)超過設(shè)置的心跳總時(shí)間之后,客戶端選擇的是重新連接,服務(wù)端則是選擇直接斷開連接。這樣的考慮是合理的,客戶端調(diào)用是強(qiáng)依賴可用連接的,而服務(wù)端可以等待客戶端重新建立連接。
Dubbo 對(duì)于建立的每一個(gè)連接,同時(shí)在客戶端和服務(wù)端開啟了 2 個(gè)定時(shí)器,一個(gè)用于定時(shí)發(fā)送心跳,一個(gè)用于定時(shí)重連、斷連,執(zhí)行的頻率均為各自檢測(cè)周期的 1/3。定時(shí)發(fā)送心跳的任務(wù)負(fù)責(zé)在連接空閑時(shí),向?qū)Χ税l(fā)送心跳包。定時(shí)重連、斷連的任務(wù)負(fù)責(zé)檢測(cè) lastRead 是否在超時(shí)周期內(nèi)仍未被更新,如果判定為超時(shí),客戶端處理的邏輯是重連,服務(wù)端則采取斷連的措施。