首頁技術文章正文

Java培訓:dubbo源碼解析-網(wǎng)絡通信

更新時間:2022-09-16 來源:黑馬程序員 瀏覽量:

  在之前的內(nèi)容中,我們講解了消費者端服務發(fā)現(xiàn)與提供者端服務暴露的相關內(nèi)容,同時也知道消費者端通過內(nèi)置的負載均衡算法獲取合適的調(diào)用invoker進行遠程調(diào)用。那么,本章節(jié)重點關注的就是遠程調(diào)用過程即網(wǎng)絡通信。

  

1663308753668_1.jpg

  網(wǎng)絡通信位于Remoting模塊:

  - Remoting 實現(xiàn)是 Dubbo 協(xié)議的實現(xiàn),如果你選擇 RMI 協(xié)議,整個 Remoting 都不會用上;

  - Remoting 內(nèi)部再劃為 `Transport 傳輸層` 和 `Exchange 信息交換層`;

  - Transport 層只負責單向消息傳輸,是對 Mina, Netty, Grizzly 的抽象,它也可以擴展 UDP 傳輸;

  - Exchange 層是在傳輸層之上封裝了 Request-Response 語義;

  網(wǎng)絡通信的問題:

  客戶端與服務端連通性問題

  粘包拆包問題

  異步多線程數(shù)據(jù)一致問題

  通信協(xié)議

  dubbo內(nèi)置,dubbo協(xié)議 ,rmi協(xié)議,hessian協(xié)議,http協(xié)議,webservice協(xié)議,thrift協(xié)議,rest協(xié)議,grpc協(xié)議,memcached協(xié)議,redis協(xié)議等10種通訊協(xié)議。各個協(xié)議特點如下

  dubbo協(xié)議

  Dubbo 缺省協(xié)議采用單一長連接和 NIO 異步通訊,適合于小數(shù)據(jù)量大并發(fā)的服務調(diào)用,以及服務消費者機器數(shù)遠大于服務提供者機器數(shù)的情況。

  缺省協(xié)議,使用基于 mina `1.1.7` 和 hessian `3.2.1` 的 tbremoting 交互。

  - 連接個數(shù):單連接

  - 連接方式:長連接

  - 傳輸協(xié)議:TCP

  - 傳輸方式:NIO 異步傳輸

  - 序列化:Hessian 二進制序列化

  - 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包較小(建議小于100K),消費者比提供者個數(shù)多,單一消費者無法壓滿提供者,盡量不要用 dubbo 協(xié)議傳輸大文件或超大字符串。

  - 適用場景:常規(guī)遠程服務方法調(diào)用

  rmi協(xié)議

  RMI 協(xié)議采用 JDK 標準的 `java.rmi.*` 實現(xiàn),采用阻塞式短連接和 JDK 標準序列化方式。

  - 連接個數(shù):多連接

  - 連接方式:短連接

  - 傳輸協(xié)議:TCP

  - 傳輸方式:同步傳輸

  - 序列化:Java 標準二進制序列化

  - 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包大小混合,消費者與提供者個數(shù)差不多,可傳文件。

  - 適用場景:常規(guī)遠程服務方法調(diào)用,與原生RMI服務互操作

  hessian協(xié)議

  Hessian 協(xié)議用于集成 Hessian 的服務,Hessian 底層采用 Http 通訊,采用 Servlet 暴露服務,Dubbo 缺省內(nèi)嵌 Jetty 作為服務器實現(xiàn)。

  Dubbo 的 Hessian 協(xié)議可以和原生 Hessian 服務互操作,即:

  - 提供者用 Dubbo 的 Hessian 協(xié)議暴露服務,消費者直接用標準 Hessian 接口調(diào)用

  - 或者提供方用標準 Hessian 暴露服務,消費方用 Dubbo 的 Hessian 協(xié)議調(diào)用。

  - 連接個數(shù):多連接

  - 連接方式:短連接

  - 傳輸協(xié)議:HTTP

  - 傳輸方式:同步傳輸

  - 序列化:Hessian二進制序列化

  - 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包較大,提供者比消費者個數(shù)多,提供者壓力較大,可傳文件。

  - 適用場景:頁面?zhèn)鬏?,文件傳輸,或與原生hessian服務互操作

  http協(xié)議

  基于 HTTP 表單的遠程調(diào)用協(xié)議,采用 Spring 的 HttpInvoker 實現(xiàn)

  - 連接個數(shù):多連接

  - 連接方式:短連接

  - 傳輸協(xié)議:HTTP

  - 傳輸方式:同步傳輸

  - 序列化:表單序列化

  - 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包大小混合,提供者比消費者個數(shù)多,可用瀏覽器查看,可用表單或URL傳入?yún)?shù),暫不支持傳文件。

  - 適用場景:需同時給應用程序和瀏覽器 JS 使用的服務。

  webservice協(xié)議

  基于 WebService 的遠程調(diào)用協(xié)議,基于 Apache CXF 實現(xiàn)](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/webservice.html#fn2)。

  可以和原生 WebService 服務互操作,即:

  - 提供者用 Dubbo 的 WebService 協(xié)議暴露服務,消費者直接用標準 WebService 接口調(diào)用,

  - 或者提供方用標準 WebService 暴露服務,消費方用 Dubbo 的 WebService 協(xié)議調(diào)用。

  - 連接個數(shù):多連接

  - 連接方式:短連接

  - 傳輸協(xié)議:HTTP

  - 傳輸方式:同步傳輸

  - 序列化:SOAP 文本序列化(http + xml)

  - 適用場景:系統(tǒng)集成,跨語言調(diào)用

  thrift協(xié)議

  當前 dubbo 支持 [[1\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn1)的 thrift 協(xié)議是對 thrift 原生協(xié)議 [[2\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn2) 的擴展,在原生協(xié)議的基礎上添加了一些額外的頭信息,比如 service name,magic number 等。

  rest協(xié)議

  基于標準的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的簡寫)實現(xiàn)的REST調(diào)用支持

  grpc協(xié)議

  Dubbo 自 2.7.5 版本開始支持 gRPC 協(xié)議,對于計劃使用 HTTP/2 通信,或者想利用 gRPC 帶來的 Stream、反壓、Reactive 編程等能力的開發(fā)者來說, 都可以考慮啟用 gRPC 協(xié)議。

  - 為期望使用 gRPC 協(xié)議的用戶帶來服務治理能力,方便接入 Dubbo 體系

  - 用戶可以使用 Dubbo 風格的,基于接口的編程風格來定義和使用遠程服務

  memcached協(xié)議

  基于 memcached實現(xiàn)的 RPC 協(xié)議

  redis協(xié)議

  基于 Redis 實現(xiàn)的 RPC 協(xié)議

  序列化

  序列化就是將對象轉成字節(jié)流,用于網(wǎng)絡傳輸,以及將字節(jié)流轉為對象,用于在收到字節(jié)流數(shù)據(jù)后還原成對象。序列化的優(yōu)勢有很多,例如安全性更好、可跨平臺等。我們知道dubbo基于netty進行網(wǎng)絡通訊,在`NettyClient.doOpen()`方法中可以看到Netty的相關類

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("decoder", adapter.getDecoder());
        pipeline.addLast("encoder", adapter.getEncoder());
        pipeline.addLast("handler", nettyHandler);
        return pipeline;
    }
});

  然后去看NettyCodecAdapter 類最后進入ExchangeCodec類的encodeRequest方法,如下:

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);
        // header.
        byte[] header = new byte[HEADER_LENGTH];

  是的,就是Serialization接口,默認是Hessian2Serialization序列化接口。

1663309013130_2.jpg

  Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,protostuff其中默認hessian2。其中java、compactedjava、nativejava屬于原生java的序列化。

  - dubbo序列化:阿里尚未開發(fā)成熟的高效java序列化實現(xiàn),阿里不建議在生產(chǎn)環(huán)境使用它。

  - **hessian2序列化:hessian是一種跨語言的高效二進制序列化方式。但這里實際不是原生的hessian2序列化,而是阿里修改過的,它是dubbo RPC默認啟用的序列化方式。**

  - json序列化:目前有兩種實現(xiàn),一種是采用的阿里的fastjson庫,另一種是采用dubbo中自己實現(xiàn)的簡單json庫,但其實現(xiàn)都不是特別成熟,而且json這種文本序列化性能一般不如上面兩種二進制序列化。

  - java序列化:主要是采用JDK自帶的Java序列化實現(xiàn),性能很不理想。

  最近幾年,各種新的高效序列化方式層出不窮,不斷刷新序列化性能的上限,最典型的包括:

  - 專門針對Java語言的:Kryo,F(xiàn)ST等等

  - 跨語言的:Protostuff,ProtoBuf,Thrift,Avro,MsgPack等等

  這些序列化方式的性能多數(shù)都顯著優(yōu)于 hessian2 (甚至包括尚未成熟的dubbo序列化)。所以我們可以為 dubbo 引入 Kryo 和 FST 這兩種高效 Java 來優(yōu)化 dubbo 的序列化。

  使用Kryo和FST非常簡單,只需要在dubbo RPC的XML配置中添加一個屬性即可:

<dubbo:protocol name="dubbo" serialization="kryo"/>

  網(wǎng)絡通信

  dubbo中數(shù)據(jù)格式

  解決socket中數(shù)據(jù)粘包拆包問題,一般有三種方式

  * 定長協(xié)議(數(shù)據(jù)包長度一致)

  * 定長的協(xié)議是指協(xié)議內(nèi)容的長度是固定的,比如協(xié)議byte長度是50,當從網(wǎng)絡上讀取50個byte后,就進行decode解碼操作。定長協(xié)議在讀取或者寫入時,效率比較高,因為數(shù)據(jù)緩存的大小基本都確定了,就好比數(shù)組一樣,缺陷就是適應性不足,以RPC場景為例,很難估計出定長的長度是多少。

  * 特殊結束符(數(shù)據(jù)尾:通過特殊的字符標識#)

  * 相比定長協(xié)議,如果能夠定義一個特殊字符作為每個協(xié)議單元結束的標示,就能夠以變長的方式進行通信,從而在數(shù)據(jù)傳輸和高效之間取得平衡,比如用特殊字符`\n`。特殊結束符方式的問題是過于簡單的思考了協(xié)議傳輸?shù)倪^程,對于一個協(xié)議單元必須要全部讀入才能夠進行處理,除此之外必須要防止用戶傳輸?shù)臄?shù)據(jù)不能同結束符相同,否則就會出現(xiàn)紊亂。

  * 變長協(xié)議(協(xié)議頭+payload模式)

  * 這種一般是自定義協(xié)議,會以定長加不定長的部分組成,其中定長的部分需要描述不定長的內(nèi)容長度。

  * dubbo就是使用這種形式的數(shù)據(jù)傳輸格式

  Dubbo 框架定義了私有的RPC協(xié)議,其中請求和響應協(xié)議的具體內(nèi)容我們使用表格來展示。

1663309170930_3.jpg

  Dubbo 數(shù)據(jù)包分為消息頭和消息體,消息頭用于存儲一些元信息,比如魔數(shù)(Magic),數(shù)據(jù)包類型(Request/Response),消息體長度(Data Length)等。消息體中用于存儲具體的調(diào)用消息,比如方法名稱,參數(shù)列表等。下面簡單列舉一下消息頭的內(nèi)容。

  | 偏移量(Bit) | 字段 | 取值 |

  | ----------- | ------------ | ------------------------------------------------------------ |

  | 0 ~ 7 | 魔數(shù)高位 | 0xda00 |

  | 8 ~ 15 | 魔數(shù)低位 | 0xbb |

  | 16 | 數(shù)據(jù)包類型 | 0 - Response, 1 - Request |

  | 17 | 調(diào)用方式 | 僅在第16位被設為1的情況下有效,0 - 單向調(diào)用,1 - 雙向調(diào)用 |

  | 18 | 事件標識 | 0 - 當前數(shù)據(jù)包是請求或響應包,1 - 當前數(shù)據(jù)包是心跳包 |

  | 19 ~ 23 | 序列化器編號 | 2 - Hessian2Serialization

  3 - JavaSerialization

  4 - CompactedJavaSerialization

  6 - FastJsonSerialization

  7 - NativeJavaSerialization

  8 - KryoSerialization

  9 - FstSerialization |

  | 24 ~ 31 | 狀態(tài) | 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ...... |

  | 32 ~ 95 | 請求編號 | 共8字節(jié),運行時生成 |

  | 96 ~ 127 | 消息體長度 | 運行時計算

  消費方發(fā)送請求

  (1)發(fā)送請求

  為了便于大家閱讀代碼,這里以 DemoService 為例,將 sayHello 方法的整個調(diào)用路徑貼出來。

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多個 Filter 調(diào)用
            —> ListenerInvokerWrapper#invoke(Invocation)
              —> AbstractInvoker#invoke(Invocation)
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

  dubbo消費方,自動生成代碼對象如下

public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {

    private InvocationHandler handler;

    public String sayHello(String string) {
        // 將參數(shù)存儲到 Object 數(shù)組中
        Object[] arrobject = new Object[]{string};
        // 調(diào)用 InvocationHandler 實現(xiàn)類的 invoke 方法得到調(diào)用結果
        Object object = this.handler.invoke(this, methods[0], arrobject);
        // 返回調(diào)用結果
        return (String)object;
    }
}

  InvokerInvocationHandler 中的 invoker 成員變量類型為 MockClusterInvoker,MockClusterInvoker 內(nèi)部封裝了服務降級邏輯。下面簡單看一下:

public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        // 獲取 mock 配置值
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
             // 無 mock 邏輯,直接調(diào)用其他 Invoker 對象的 invoke 方法,
            // 比如 FailoverClusterInvoker
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            // force:xxx 直接執(zhí)行 mock 邏輯,不發(fā)起遠程調(diào)用
            result = doMockInvoke(invocation, null);
        } else {
             // fail:xxx 表示消費方對調(diào)用服務失敗后,再執(zhí)行 mock 邏輯,不拋出異常
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                 // 調(diào)用失敗,執(zhí)行 mock 邏輯
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }

  考慮到前文已經(jīng)詳細分析過 FailoverClusterInvoker,因此本節(jié)略過 FailoverClusterInvoker,直接分析 DubboInvoker。

public abstract class AbstractInvoker<T> implements Invoker<T> {
   
    public Result invoke(Invocation inv) throws RpcException {
        if (destroyed.get()) {
            throw new RpcException("Rpc invoker for service ...");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        // 設置 Invoker
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            // 設置 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 變量中
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 設置異步信息到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            // 抽象方法,由子類實現(xiàn)
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            // ...
        } catch (RpcException e) {
            // ...
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
   
    // 省略其他方法
}

  上面的代碼來自 AbstractInvoker 類,其中大部分代碼用于添加信息到 RpcInvocation#attachment 變量中,添加完畢后,調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用。doInvoke 是一個抽象方法,需要由子類實現(xiàn),下面到 DubboInvoker 中看一下。

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        //將目標方法以及版本好作為參數(shù)放入到Invocation中
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        //獲得客戶端連接
        ExchangeClient currentClient; //初始化invoker的時候,構建的一個遠程通信連接
        if (clients.length == 1) { //默認
            currentClient = clients[0];
        } else {
            //通過取模獲得其中一個連接
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            //表示當前的方法是否存在返回值
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            //isOneway 為 true,表示“單向”通信
            if (isOneway) {//異步無返回值
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else { //存在返回值
                //是否采用異步
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                responseFuture.whenComplete((obj, t) -> {
                    if (t != null) {
                        asyncRpcResult.completeExceptionally(t);
                    } else {
                        asyncRpcResult.complete((AppResponse) obj);
                    }
                });
                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                return asyncRpcResult;
            }
        }
        //省略無關代碼
    }

  最終進入到HeaderExchangeChannel#request方法,拼裝Request并將請求發(fā)送出去

public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed tosend request " + request + ", cause: The channel " + this + " is closed!");
        }
        // 創(chuàng)建請求對象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            //NettyClient
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

  (2)請求編碼

  在netty啟動時,我們設置了編解碼器,其中通過ExchangeCodec完成編解碼工作如下:

public class ExchangeCodec extends TelnetCodec {

    // 消息頭長度
    protected static final int HEADER_LENGTH = 16;
    // 魔數(shù)內(nèi)容
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode() {
        return MAGIC;
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 對 Request 對象進行編碼
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 對 Response 對象進行編碼,后面分析
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);

        // 創(chuàng)建消息頭字節(jié)數(shù)組,長度為 16
        byte[] header = new byte[HEADER_LENGTH];

        // 設置魔數(shù)
        Bytes.short2bytes(MAGIC, header);

        // 設置數(shù)據(jù)包類型(Request/Response)和序列化器編號
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 設置通信方式(單向/雙向)
        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
       
        // 設置事件標識
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // 設置請求編號,8個字節(jié),從第4個字節(jié)開始設置
        Bytes.long2bytes(req.getId(), header, 4);

        // 獲取 buffer 當前的寫位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,為消息頭預留 16 個字節(jié)的空間
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 創(chuàng)建序列化器,比如 Hessian2ObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            // 對事件數(shù)據(jù)進行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
            // 對請求數(shù)據(jù)進行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
       
        // 獲取寫入的字節(jié)數(shù),也就是消息體長度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 將消息體長度寫入到消息頭中
        Bytes.int2bytes(len, header, 12);

        // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
        buffer.writerIndex(savedWriteIndex);
        // 從 savedWriteIndex 下標處寫入消息頭
        buffer.writeBytes(header);
        // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
   
    // 省略其他方法
}

  以上就是請求對象的編碼過程,該過程首先會通過位運算將消息頭寫入到 header 數(shù)組中。然后對 Request 對象的 data 字段執(zhí)行序列化操作,序列化后的數(shù)據(jù)最終會存儲到 ChannelBuffer 中。序列化操作執(zhí)行完后,可得到數(shù)據(jù)序列化后的長度 len,緊接著將 len 寫入到 header 指定位置處。最后再將消息頭字節(jié)數(shù)組 header 寫入到 ChannelBuffer 中,整個編碼過程就結束了。本節(jié)的最后,我們再來看一下 Request 對象的 data 字段序列化過程,也就是 encodeRequestData 方法的邏輯,如下:

public class DubboCodec extends ExchangeCodec implements Codec2 {
   
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        // 依次序列化 dubbo version、path、version
        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        // 序列化調(diào)用方法名
        out.writeUTF(inv.getMethodName());
        // 將參數(shù)類型轉換為字符串,并進行序列化
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i = 0; i < args.length; i++) {
                // 對運行時參數(shù)進行序列化
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
       
        // 序列化 attachments
        out.writeObject(inv.getAttachments());
    }
}

  至此,關于服務消費方發(fā)送請求的過程就分析完了,接下來我們來看一下服務提供方是如何接收請求的。

  提供方接收請求

  (1) 請求解碼

  這里直接分析請求數(shù)據(jù)的解碼邏輯,忽略中間過程,如下:

public class ExchangeCodec extends TelnetCodec {
   
    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int readable = buffer.readableBytes();
        // 創(chuàng)建消息頭字節(jié)數(shù)組
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        // 讀取消息頭數(shù)據(jù)
        buffer.readBytes(header);
        // 調(diào)用重載方法進行后續(xù)解碼工作
        return decode(channel, buffer, readable, header);
    }

    @Override
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 檢查魔數(shù)是否相等
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            // 通過 telnet 命令行發(fā)送的數(shù)據(jù)包不包含消息頭,所以這里
            // 調(diào)用 TelnetCodec 的 decode 方法對數(shù)據(jù)包進行解碼
            return super.decode(channel, buffer, readable, header);
        }
       
        // 檢測可讀數(shù)據(jù)量是否少于消息頭長度,若小于則立即返回 DecodeResult.NEED_MORE_INPUT
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // 從消息頭中獲取消息體長度
        int len = Bytes.bytes2int(header, 12);
        // 檢測消息體長度是否超出限制,超出則拋出異常
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        // 檢測可讀的字節(jié)數(shù)是否小于實際的字節(jié)數(shù)
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }
       
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            // 繼續(xù)進行解碼工作
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
}

  上面方法通過檢測消息頭中的魔數(shù)是否與規(guī)定的魔數(shù)相等,提前攔截掉非常規(guī)數(shù)據(jù)包,比如通過 telnet 命令行發(fā)出的數(shù)據(jù)包。接著再對消息體長度,以及可讀字節(jié)數(shù)進行檢測。最后調(diào)用 decodeBody 方法進行后續(xù)的解碼工作,ExchangeCodec 中實現(xiàn)了 decodeBody 方法,但因其子類 DubboCodec 覆寫了該方法,所以在運行時 DubboCodec 中的 decodeBody 方法會被調(diào)用。下面我們來看一下該方法的代碼。

public class DubboCodec extends ExchangeCodec implements Codec2 {

    @Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        // 獲取消息頭中的第三個字節(jié),并通過邏輯與運算得到序列化器編號
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲取調(diào)用編號
        long id = Bytes.bytes2long(header, 4);
        // 通過邏輯與運算得到調(diào)用類型,0 - Response,1 - Request
        if ((flag & FLAG_REQUEST) == 0) {
            // 對響應結果進行解碼,得到 Response 對象。這個非本節(jié)內(nèi)容,后面再分析
            // ...
        } else {
            // 創(chuàng)建 Request 對象
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            // 通過邏輯與運算得到通信方式,并設置到 Request 對象中
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
           
            // 通過位運算檢測數(shù)據(jù)包是否為事件類型
            if ((flag & FLAG_EVENT) != 0) {
                // 設置心跳事件到 Request 對象中
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                if (req.isHeartbeat()) {
                    // 對心跳包進行解碼,該方法已被標注為廢棄
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    // 對事件數(shù)據(jù)進行解碼
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {
                    DecodeableRpcInvocation inv;
                    // 根據(jù) url 參數(shù)判斷是否在 IO 線程上對消息體進行解碼
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        // 在當前線程,也就是 IO 線程上進行后續(xù)的解碼工作。此工作完成后,可將
                        // 調(diào)用方法名、attachment、以及調(diào)用參數(shù)解析出來
                        inv.decode();
                    } else {
                        // 僅創(chuàng)建 DecodeableRpcInvocation 對象,但不在當前線程上執(zhí)行解碼邏輯
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
               
                // 設置 data 到 Request 對象中
                req.setData(data);
            } catch (Throwable t) {
                // 若解碼過程中出現(xiàn)異常,則將 broken 字段設為 true,
                // 并將異常對象設置到 Reqeust 對象中
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }
}

  如上,decodeBody 對部分字段進行了解碼,并將解碼得到的字段封裝到 Request 中。隨后會調(diào)用 DecodeableRpcInvocation 的 decode 方法進行后續(xù)的解碼工作。此工作完成后,可將調(diào)用方法名、attachment、以及調(diào)用參數(shù)解析出來。

  (2)調(diào)用服務

  解碼器將數(shù)據(jù)包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會收到這個對象,并將這個對象繼續(xù)向下傳遞。整個調(diào)用棧如下:

NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
          —> ExecutorService#execute(Runnable)    // 由線程池執(zhí)行后續(xù)的調(diào)用邏輯

  這里我們直接分析調(diào)用棧中的分析第一個和最后一個調(diào)用方法邏輯。如下:

  考慮到篇幅,以及很多中間調(diào)用的邏輯并非十分重要,所以這里就不對調(diào)用棧中的每個方法都進行分析了。這里我們直接分析最后一個調(diào)用方法邏輯。如下:

public class ChannelEventRunnable implements Runnable {
   
    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;
   
    @Override
    public void run() {
        // 檢測通道狀態(tài),對于請求或響應消息,此時 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 將 channel 和 message 傳給 ChannelHandler 對象,進行后續(xù)的調(diào)用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("... operation error, channel is ... message is ...");
            }
        }
       
        // 其他消息類型通過 switch 進行處理
        else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("... operation error, channel is ...");
                }
                break;
            case DISCONNECTED:
                // ...
            case SENT:
                // ...
            case CAUGHT:
                // ...
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }
}

  如上,請求和響應消息出現(xiàn)頻率明顯比其他類型消息高,所以這里對該類型的消息進行了針對性判斷。ChannelEventRunnable 僅是一個中轉站,它的 run 方法中并不包含具體的調(diào)用邏輯,僅用于將參數(shù)傳給其他 ChannelHandler 對象進行處理,該對象類型為 DecodeHandler。

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            // 對 Decodeable 接口實現(xiàn)類對象進行解碼
            decode(message);
        }

        if (message instanceof Request) {
            // 對 Request 的 data 字段進行解碼
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            // 對 Request 的 result 字段進行解碼
            decode(((Response) message).getResult());
        }

        // 執(zhí)行后續(xù)邏輯
        handler.received(channel, message);
    }

    private void decode(Object message) {
        // Decodeable 接口目前有兩個實現(xiàn)類,
        // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message != null && message instanceof Decodeable) {
            try {
                // 執(zhí)行解碼邏輯
                ((Decodeable) message).decode();
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            }
        }
    }
}

  DecodeHandler 主要是包含了一些解碼邏輯,完全解碼后的 Request 對象會繼續(xù)向后傳遞

public class DubboProtocol extends AbstractProtocol {

    public static final String NAME = "dubbo";
   
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 獲取 Invoker 實例
                Invoker<?> invoker = getInvoker(channel, inv);
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    // 回調(diào)相關,忽略
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 通過 Invoker 調(diào)用具體的服務
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: ...");
        }
       
        // 忽略其他方法
    }
   
    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        // 忽略回調(diào)和本地存根相關邏輯
        // ...
       
        int port = channel.getLocalAddress().getPort();
       
        // 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比如:
        //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

        // 從 exporterMap 查找與 serviceKey 相對應的 DubboExporter 對象,
        // 服務導出過程中會將 <serviceKey, DubboExporter> 映射關系存儲到 exporterMap 集合中
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service ...");

        // 獲取 Invoker 對象,并返回
        return exporter.getInvoker();
    }
   
    // 忽略其他方法
}

  在之前課程中介紹過,服務全部暴露完成之后保存到exporterMap中。這里就是通過serviceKey獲取exporter之后獲取Invoker,并通過 Invoker 的 invoke 方法調(diào)用服務邏輯

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用,并將調(diào)用結果封裝到 RpcResult 中,并
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method ...");
        }
    }
   
    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}

  如上,doInvoke 是一個抽象方法,這個需要由具體的 Invoker 實例實現(xiàn)。Invoker 實例是在運行時通過 JavassistProxyFactory 創(chuàng)建的,創(chuàng)建邏輯如下:

public class JavassistProxyFactory extends AbstractProxyFactory {
   
    // 省略其他方法

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 創(chuàng)建匿名類對象
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 調(diào)用 invokeMethod 方法進行后續(xù)的調(diào)用
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

  Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。Dubbo 會在運行時通過 Javassist 框架為 Wrapper 生成實現(xiàn)類,并實現(xiàn) invokeMethod 方法,該方法最終會根據(jù)調(diào)用信息調(diào)用具體的服務。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

/** Wrapper0 是在運行時生成的,大家可使用 Arthas 進行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 類型轉換
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根據(jù)方法名調(diào)用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}

  到這里,整個服務調(diào)用過程就分析完了。最后把調(diào)用過程貼出來,如下:

ChannelEventRunnable#run()
  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
          —> Filter#invoke(Invoker, Invocation)
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)

  提供方返回調(diào)用結果

  服務提供方調(diào)用指定服務后,會將調(diào)用結果封裝到 Response 對象中,并將該對象返回給服務消費方。服務提供方也是通過 NettyChannel 的 send 方法將 Response 對象返回,這里就不在重復分析了。本節(jié)我們僅需關注 Response 對象的編碼過程即可。

public class ExchangeCodec extends TelnetCodec {
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 對響應對象進行編碼
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }
   
    protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
        int savedWriteIndex = buffer.writerIndex();
        try {
            Serialization serialization = getSerialization(channel);
            // 創(chuàng)建消息頭字節(jié)數(shù)組
            byte[] header = new byte[HEADER_LENGTH];
            // 設置魔數(shù)
            Bytes.short2bytes(MAGIC, header);
            // 設置序列化器編號
            header[2] = serialization.getContentTypeId();
            if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
            // 獲取響應狀態(tài)
            byte status = res.getStatus();
            // 設置響應狀態(tài)
            header[3] = status;
            // 設置請求編號
            Bytes.long2bytes(res.getId(), header, 4);

            // 更新 writerIndex,為消息頭預留 16 個字節(jié)的空間
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
           
            if (status == Response.OK) {
                if (res.isHeartbeat()) {
                    // 對心跳響應結果進行序列化,已廢棄
                    encodeHeartbeatData(channel, out, res.getResult());
                } else {
                    // 對調(diào)用結果進行序列化
                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
                }
            } else {
                // 對錯誤信息進行序列化
                out.writeUTF(res.getErrorMessage())
            };
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();

            // 獲取寫入的字節(jié)數(shù),也就是消息體長度
            int len = bos.writtenBytes();
            checkPayload(channel, len);
           
            // 將消息體長度寫入到消息頭中
            Bytes.int2bytes(len, header, 12);
            // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
            buffer.writerIndex(savedWriteIndex);
            // 從 savedWriteIndex 下標處寫入消息頭
            buffer.writeBytes(header);
            // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // 異常處理邏輯不是很難理解,但是代碼略多,這里忽略了
        }
    }
}

public class DubboCodec extends ExchangeCodec implements Codec2 {
   
    protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        Result result = (Result) data;
        // 檢測當前協(xié)議版本是否支持帶有 attachment 集合的 Response 對象
        boolean attach = Version.isSupportResponseAttachment(version);
        Throwable th = result.getException();
       
        // 異常信息為空
        if (th == null) {
            Object ret = result.getValue();
            // 調(diào)用結果為空
            if (ret == null) {
                // 序列化響應類型
                out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
            }
            // 調(diào)用結果非空
            else {
                // 序列化響應類型
                out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
                // 序列化調(diào)用結果
                out.writeObject(ret);
            }
        }
        // 異常信息非空
        else {
            // 序列化響應類型
            out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
            // 序列化異常對象
            out.writeObject(th);
        }

        if (attach) {
            // 記錄 Dubbo 協(xié)議版本
            result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
            // 序列化 attachments 集合
            out.writeObject(result.getAttachments());
        }
    }
}

  以上就是 Response 對象編碼的過程,和前面分析的 Request 對象編碼過程很相似。如果大家能看 Request 對象的編碼邏輯,那么這里的 Response 對象的編碼邏輯也不難理解,就不多說了。接下來我們再來分析雙向通信的最后一環(huán) —— 服務消費方接收調(diào)用結果。

  消費方接收調(diào)用結果

  服務消費方在收到響應數(shù)據(jù)后,首先要做的事情是對響應數(shù)據(jù)進行解碼,得到 Response 對象。然后再將該對象傳遞給下一個入站處理器,這個入站處理器就是 NettyHandler。接下來 NettyHandler 會將這個對象繼續(xù)向下傳遞,最后 AllChannelHandler 的 received 方法會收到這個對象,并將這個對象派發(fā)到線程池中。這個過程和服務提供方接收請求的過程是一樣的,因此這里就不重復分析了。

  (1)響應數(shù)據(jù)解碼

  響應數(shù)據(jù)解碼邏輯主要的邏輯封裝在 DubboCodec 中,我們直接分析這個類的代碼。如下:

public class DubboCodec extends ExchangeCodec implements Codec2 {

    @Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲取請求編號
        long id = Bytes.bytes2long(header, 4);
        // 檢測消息類型,若下面的條件成立,表明消息類型為 Response
        if ((flag & FLAG_REQUEST) == 0) {
            // 創(chuàng)建 Response 對象
            Response res = new Response(id);
            // 檢測事件標志位
            if ((flag & FLAG_EVENT) != 0) {
                // 設置心跳事件
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // 獲取響應狀態(tài)
            byte status = header[3];
            // 設置響應狀態(tài)
            res.setStatus(status);
           
            // 如果響應狀態(tài)為 OK,表明調(diào)用過程正常
            if (status == Response.OK) {
                try {
                    Object data;
                    if (res.isHeartbeat()) {
                        // 反序列化心跳數(shù)據(jù),已廢棄
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    } else if (res.isEvent()) {
                        // 反序列化事件數(shù)據(jù)
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    } else {
                        DecodeableRpcResult result;
                        // 根據(jù) url 參數(shù)決定是否在 IO 線程上執(zhí)行解碼邏輯
                        if (channel.getUrl().getParameter(
                                Constants.DECODE_IN_IO_THREAD_KEY,
                                Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            // 創(chuàng)建 DecodeableRpcResult 對象
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            // 進行后續(xù)的解碼工作
                            result.decode();
                        } else {
                            // 創(chuàng)建 DecodeableRpcResult 對象
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                   
                    // 設置 DecodeableRpcResult 對象到 Response 對象中
                    res.setResult(data);
                } catch (Throwable t) {
                    // 解碼過程中出現(xiàn)了錯誤,此時設置 CLIENT_ERROR 狀態(tài)碼到 Response 對象中
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            }
            // 響應狀態(tài)非 OK,表明調(diào)用過程出現(xiàn)了異常
            else {
                // 反序列化異常信息,并設置到 Response 對象中
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        } else {
            // 對請求數(shù)據(jù)進行解碼,前面已分析過,此處忽略
        }
    }
}

  以上就是響應數(shù)據(jù)的解碼過程,上面邏輯看起來是不是似曾相識。對的,我們在前面章節(jié)分析過 DubboCodec 的 decodeBody 方法中關于請求數(shù)據(jù)的解碼過程,該過程和響應數(shù)據(jù)的解碼過程很相似。下面,我們繼續(xù)分析調(diào)用結果的反序列化過程

public class DecodeableRpcResult extends AppResponse implements Codec, Decodeable {

    private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class);

    private Channel channel;

    private byte serializationType;

    private InputStream inputStream;

    private Response response;

    private Invocation invocation;

    private volatile boolean hasDecoded;

    public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) {
        Assert.notNull(channel, "channel == null");
        Assert.notNull(response, "response == null");
        Assert.notNull(is, "inputStream == null");
        this.channel = channel;
        this.response = response;
        this.inputStream = is;
        this.invocation = invocation;
        this.serializationType = id;
    }

    @Override
    public void encode(Channel channel, OutputStream output, Object message) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);
        // 反序列化響應類型
        byte flag = in.readByte();
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE:
                break;
            case DubboCodec.RESPONSE_VALUE:
                handleValue(in);
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION:
                handleException(in);
                break;
                // 返回值為空,且攜帶了 attachments 集合
            case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
                handleAttachment(in);
                break;
                //返回值不為空,且攜帶了 attachments 集合
            case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
                handleValue(in);
                handleAttachment(in);
                break;
            // 異常對象不為空,且攜帶了 attachments 集合
            case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
                handleException(in);
                handleAttachment(in);
                break;
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag);
        }
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
        return this;
    }

  正常調(diào)用下,線程會進入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。然后線程會從 invocation 變量(大家探索一下 invocation 變量的由來)中獲取返回值類型,接著對調(diào)用結果進行反序列化,并將序列化后的結果存儲起來。最后對 attachments 集合進行反序列化,并存到指定字段中。

  異步轉同步

  Dubbo發(fā)送數(shù)據(jù)至服務方后,在通信層面是異步的,通信線程并不會等待結果數(shù)據(jù)返回。而我們在使用Dubbo進行RPC調(diào)用缺省就是同步的,這其中就涉及到了異步轉同步的操作。

  而在2.7.x版本中,這種自實現(xiàn)的異步轉同步操作進行了修改。新的`DefaultFuture`繼承了`CompletableFuture`,新的`doReceived(Response res)`方法如下:

private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}

  通過`CompletableFuture#complete`方法來設置異步的返回結果,且刪除舊的`get()`方法,使用`CompletableFuture#get()`方法:

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

  使用`CompletableFuture`完成了異步轉同步的操作。

  異步多線程數(shù)據(jù)一致

  這里簡單說明一下。一般情況下,服務消費方會并發(fā)調(diào)用多個服務,每個用戶線程發(fā)送請求后,會調(diào)用 get 方法進行等待。 一段時間后,服務消費方的線程池會收到多個響應對象。這個時候要考慮一個問題,如何將每個響應對象傳遞給相應的 Future 對象,不出錯。答案是通過調(diào)用**編號**。Future 被創(chuàng)建時,會要求傳入一個 Request 對象。此時 DefaultFuture 可從 Request 對象中獲取調(diào)用編號,并將 <調(diào)用編號, DefaultFuture 對象> 映射關系存入到靜態(tài) Map 中,即 FUTURES。線程池中的線程在收到 Response 對象后,會根據(jù) Response 對象中的調(diào)用編號到 FUTURES 集合中取出相應的 DefaultFuture 對象,然后再將 Response 對象設置到 DefaultFuture 對象中。這樣用戶線程即可從 DefaultFuture 對象中獲取調(diào)用結果了。整個過程大致如下圖:

  

1663310007996_4.jpg

private DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
    // put into waiting map.
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}

  心跳檢查

  Dubbo采用雙向心跳的方式檢測Client端與Server端的連通性。

  我們再來看看 Dubbo 是如何設計應用層心跳的。Dubbo 的心跳是雙向心跳,客戶端會給服務端發(fā)送心跳,反之,服務端也會向客戶端發(fā)送心跳。

  創(chuàng)建定時器

public class HeaderExchangeClient implements ExchangeClient {

    private final Client client;
    private final ExchangeChannel channel;

    private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
            new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
   
    private HeartbeatTimerTask heartBeatTimerTask;
    private ReconnectTimerTask reconnectTimerTask;

    public HeaderExchangeClient(Client client, boolean startTimer) {
        Assert.notNull(client, "Client can't be null");
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);

        if (startTimer) {
            URL url = client.getUrl();
            //開啟心跳失敗之后處理重連,斷連的邏輯定時任務
            startReconnectTask(url);
            //開啟發(fā)送心跳請求定時任務
            startHeartBeatTask(url);
        }
    }

  Dubbo 在 `HeaderExchangeClient `初始化時開啟了兩個定時任務

  `startReconnectTask` 主要用于定時發(fā)送心跳請求

  `startHeartBeatTask` 主要用于心跳失敗之后處理重連,斷連的邏輯

  發(fā)送心跳請求

  詳細解析下心跳檢測定時任務的邏輯 `HeartbeatTimerTask#doTask`:

protected void doTask(Channel channel) {
      Long lastRead = lastRead(channel);
      Long lastWrite = lastWrite(channel);
      if ((lastRead != null && now() - lastRead > heartbeat)
          || (lastWrite != null && now() - lastWrite > heartbeat)) {
          Request req = new Request();
          req.setVersion(Version.getProtocolVersion());
          req.setTwoWay(true);
          req.setEvent(Request.HEARTBEAT_EVENT);
          channel.send(req);
      }
   }

  前面已經(jīng)介紹過,**Dubbo 采取的是雙向心跳設計**,即服務端會向客戶端發(fā)送心跳,客戶端也會向服務端發(fā)送心跳,接收的一方更新 lastRead 字段,發(fā)送的一方更新 lastWrite 字段,超過心跳間隙的時間,便發(fā)送心跳請求給對端。這里的 lastRead/lastWrite 同樣會被同一個通道上的普通調(diào)用更新,通過更新這兩個字段,實現(xiàn)了只在連接空閑時才會真正發(fā)送空閑報文的機制,符合我們一開始科普的做法。

  處理重連和斷連

  繼續(xù)研究下重連和斷連定時器都實現(xiàn)了什么 `ReconnectTimerTask#doTask`。

   protected void doTask(Channel channel) {
       Long lastRead = lastRead(channel);
       Long now = now();
       if (!channel.isConnected()) {
           ((Client) channel).reconnect();
           // check pong at client
       } else if (lastRead != null && now - lastRead > idleTimeout) {
           ((Client) channel).reconnect();
       }
    }

  第二個定時器則負責根據(jù)客戶端、服務端類型來對連接做不同的處理,當超過設置的心跳總時間之后,客戶端選擇的是重新連接,服務端則是選擇直接斷開連接。這樣的考慮是合理的,客戶端調(diào)用是強依賴可用連接的,而服務端可以等待客戶端重新建立連接。

  Dubbo 對于建立的每一個連接,同時在客戶端和服務端開啟了 2 個定時器,一個用于定時發(fā)送心跳,一個用于定時重連、斷連,執(zhí)行的頻率均為各自檢測周期的 1/3。定時發(fā)送心跳的任務負責在連接空閑時,向?qū)Χ税l(fā)送心跳包。定時重連、斷連的任務負責檢測 lastRead 是否在超時周期內(nèi)仍未被更新,如果判定為超時,客戶端處理的邏輯是重連,服務端則采取斷連的措施。

分享到:
在線咨詢 我要報名
和我們在線交談!