<listing id="5x1tx"></listing>
    <track id="5x1tx"></track><pre id="5x1tx"></pre>
    <listing id="5x1tx"><strike id="5x1tx"></strike></listing>

      <del id="5x1tx"></del>

      <noframes id="5x1tx">

      <big id="5x1tx"></big>

      您的位置:首頁 >聚焦 >

      世界速遞!探秘分布式異步通信(1)

      2022-09-23 15:51:03    來源:程序員客棧

      分布式場景下通常會使用同步和異步方式來進行跨進程通信。同步方式較容易理解,它具有通信實時性高的特點。為了滿足系統的高吞吐,產生了異步通信方式。我們重點聊聊分布式場景下的異步通信方式。

      狹義上的同步通信方式是指,當請求發送出去后,在沒有接收到請求的響應之前,調用方用戶線程只能阻塞等待返回,這個時間內做不了別的事情。


      【資料圖】

      相對的,狹義的異步通信方式是指,當請求借助異步通信機制發送出去后,用戶線程可以繼續執行別的操作,并不會阻塞等待應答返回;當請求返回后,會借助通知等方式通知調用者,或者通過回調函數來執行后續的邏輯。

      廣義上的同步通信方式是指,服務調用方發送一個請求,需要等待服務提供方執行完成的結果,否則就不能繼續執行后續邏輯。

      相對應的,廣義的異步通信方式是指,上游的服務調用方只要確保請求消息成功發送就可以返回(我們稱這樣的調用方為消息生產者),繼續執行后續的業務。業務邏輯的執行交給下游的服務(我們稱這樣的服務為消息消費者)。這種異步執行的業務邏輯通常是耗時的長事務,比如說物流發貨、視頻轉碼等業務場景。

      本系列重點關注廣義的異步通信方式,目前主流的異步通信方式分為線程池、隊列以及回調機制。

      1.線程池

      基于線程池能夠實現異步通信。線程池是一種基于池化概念產生的線程集合。

      線程池原理

      線程池是為了避免頻繁的創建銷毀線程為系統帶來額外的內存、CPU壓力,而對線程進行了復用。當需要使用線程時,從池中取出一個使用,用完后再將此線程“返還”給線程池。我們稱這里的線程為“工作線程”,通過下圖來形象表達上述的過程。

      首先,向線程池中提交任務1。此時線程中已經存在6個工作線程,其中一個處于忙碌狀態,正在執行已提交的任務。然后,線程池從剩余的5個空閑工作線程中選擇一個,分配給任務1,開始執行任務1,如下圖所示。此時,工作線程1被分配給任務1,開始執行任務1的業務邏輯,工作線程1的狀態變更為忙碌。

      在任務被執行完成后,工作線程并不忙著被關閉,而是被返回線程池中,狀態仍舊為空閑,方便執行后續到來的任務。

      總結來說,線程池就是將“頻繁的創建新的線程”變更為“從線程池中直接獲取一個線程”;將“在執行任務結束后關閉線程”變更為“向池中歸還線程”。這樣就避免了頻繁創建、銷毀線程帶來的額外時間和空間的開銷。

      使用線程池實現異步通信

      從上面的介紹中我們知道,線程池是通過將任務分配給工作線程,實現對任務的執行的。即,如果要實現對外的接口調用(或者網絡通信),則完全可以在用戶線程中向線程池中提交一個接口調用任務,然后工作線程就可以立即返回執行其他業務操作,待接口調用返回后通知工作線程對結果進行處理即可。

      當用戶線程提交任務到線程池成功后,就接著執行其他業務邏輯了。在程池中工作線程對外部接口的調用返回后,會通知用戶線程繼續返回的結果。

      ?

      實際上,在Java的JUC包中已經實現了這種通知機制,通過線程池+Callable+Future的方式,就能夠很方便、快捷地實現高性能的異步通信機制。而線程池在Java的JUC包中也提供了參考實現,即ThreadPoolExecutor。另外,Java還提供了Executors工具類,以便我們快速創建模板線程池。這些優秀的代碼實現在實戰開發中都得到了廣泛應用。

      ?
      [實戰]使用線程池實現異步通信

      下面通過一段代碼直觀感受一下在Java中使用線程池實現異步通信的過程。

      ?

      實現遠程訂單服務,MockRemoteOrderService.java,用于模擬外部系統的訂單查詢功能。

      ?

      /***@classNameMockRemoteOrderService*@desc模擬遠端訂單查詢服務*/publicclassMockRemoteOrderService{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(MockRemoteOrderService.class);publicOrderQueryResponsequeryOrder(OrderQueryRequestorderQueryRequest){try{LOGGER.info("queryOrder開始,orderQueryRequest:{}",JSON.toJSONString(orderQueryRequest));//模擬網絡耗時800msThread.sleep(800);OrderQueryResponseorderQueryResponse=newOrderQueryResponse(orderQueryRequest.getOrderId(),19.99);LOGGER.info("queryOrder成功,orderQueryResponse:{}",JSON.toJSONString(orderQueryResponse));returnorderQueryResponse;}catch(Exceptione){LOGGER.error("queryOrder異常,orderQueryRequest:{}",JSON.toJSONString(orderQueryRequest),e);returnnull;}}}

      通過Thread.sleep(800),模擬網絡耗時。

      ?

      實現訂單查詢請求和響應實體——OrderQueryRequest.java和OrderQueryResponse.java。

      ?

      /***@classNameOrderQueryRequest*@desc訂單查詢請求*/publicclassOrderQueryRequest{/**訂單號*/privateStringorderId;publicOrderQueryRequest(StringorderId){this.orderId=orderId;}省略getter、setter/***@classNameOrderQueryResponse*@desc訂單查詢結果響應*/publicclassOrderQueryResponse{/**訂單號*/privateStringorderId;/**訂單金額*/privateDoubleorderAmount;publicOrderQueryResponse(StringorderId,DoubleorderAmount){this.orderId=orderId;this.orderAmount=orderAmount;}省略getter、setter

      ?

      實現本地訂單查詢業務邏輯LocalBizService.java。

      ?

      /***@classNameLocalBizService*@desc本地業務邏輯*/publicclassLocalBizService{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(LocalBizService.class);MockRemoteOrderServicemockRemoteOrderService;/**自定義訂單查詢線程池*/privatestaticfinalExecutorServiceORDER_QUERY_THREAD_POOL=newThreadPoolExecutor(Runtime.getRuntime().availableProcessors()+1,Runtime.getRuntime().availableProcessors()*2,60,TimeUnit.SECONDS,newLinkedBlockingQueue(500),newThreadFactory(){@OverridepublicThreadnewThread(Runnabler){Threadthread=newThread(r,"order-query-thread-pool");thread.setDaemon(true);returnthread;}},newThreadPoolExecutor.CallerRunsPolicy());publicLocalBizService(MockRemoteOrderServicemockRemoteOrderService){this.mockRemoteOrderService=mockRemoteOrderService;}publicvoidexecute(StringorderId){//參數校驗if(StringUtils.isBlank(orderId)){thrownewRuntimeException("orderId為空!orderId:"+orderId);}//組裝請求OrderQueryRequestorderQueryRequest=newOrderQueryRequest(orderId);//提交訂單查詢任務ORDER_QUERY_THREAD_POOL.submit(newRunnable(){@Overridepublicvoidrun(){mockRemoteOrderService.queryOrder(orderQueryRequest);}});LOGGER.info("訂單查詢任務提交成功,orderId:{}",orderId);//其他業務邏輯doSomething();}privatevoiddoSomething(){LOGGER.info("查詢訂單期間繼續執行其他業務邏輯......");}publicstaticvoidmain(String[]args){//實例化mock遠程訂單查詢服務MockRemoteOrderServicemockRemoteOrderService=newMockRemoteOrderService();//實例化本地業務邏輯LocalBizServicelocalBizService=newLocalBizService(mockRemoteOrderService);//提交訂單查詢任務StringorderId="ORDER_"+UUID.randomUUID().toString();localBizService.execute(orderId);while(true){//hold住主線程}}}

      在main()方法中,首先實例化MockRemoteOrderService對象,模擬調用遠程訂單查詢服務;接著實例化LocalBizService對象,通過構造注入MockRemoteOrderService對象,模擬本地業務邏輯;初始化一個訂單Id,通過調用localBizService.execute(orderId)方法,通過線程池提交了一個訂單查詢任務,模擬發起一次異步的遠程通信;在execute()方法中,通過線程池的submit()方法提交了任務到線程池后,繼續執行后續的業務邏輯即doSomething()。

      最后運行main()方法,運行結果如下:

      23:38:59.070[main]INFOcom.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.LocalBizService-訂單查詢任務提交成功,orderId:ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd23:38:59.075[main]INFOcom.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.LocalBizService-查詢訂單期間繼續執行其他業務邏輯......23:38:59.150[order-query-thread-pool]INFOcom.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.service.MockRemoteOrderService-queryOrder開始,orderQueryRequest:{"orderId":"ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd"}23:38:59.976[order-query-thread-pool]INFOcom.snowalker.from.distributed.to.cloudnative.section_1_2_2.async_threadpool.service.MockRemoteOrderService-queryOrder成功,orderQueryResponse:{"orderAmount":19.99,"orderId":"ORDER_98c0e2e1-964a-4f94-9ec6-f50669d700dd"}

      通過日志打印可以直觀的看到,在任務提交成功后,主線程繼續執行后續業務邏輯,而耗時的訂單查詢任務會通過線程池的工作線程異步執行。

      這就是通過線程池實現異步通信的代碼實現。

      消息隊列

      廣義上的隊列分為進程內的隊列及進程間隊列,例如,Java中的ArrayBlockingQueue、LinkedBlockingQueue等就屬于進程內的隊列;而ActiveMQ、Kafka、RocketMQ、RabbitMQ等就屬于進程間的隊列。我們在日常開發中說的消息隊列,如果沒有特指,往往指的是進程間的隊列。

      那么在分布式系統中,如何基于進程間的消息隊列實現異步通信呢?先看一個實際的案例:這是一個簡化版的分布式電商系統,它的訂單中心中保存了核心的訂單交易數據。在系統建設的初期,商品中心進行庫存扣減時,需要關聯訂單數據;物流中心在發貨時也需要同步訂單數據;支付中心在支付發起時,也需要同步訂單數據。此時的交互過程如圖所示。

      訂單中心直接調用商品中心、物流中心、支付中心提供的數據同步接口,將訂單相關數據同步發送給了其他的系統。這看起來很正常,也沒有什么問題發生。

      隨著系統規模逐漸增長,又有更多的系統需要獲取訂單數據進行各自的業務操作和數據分析,比如:風控中心需要通過訂單數據進行風控相關的操作;倉儲中心需要通過分析訂單數據對貨品管理進行調控;廣告投放中心需要通過訂單數據對廣告轉化率進行計算和優化;數據中心需要通過對訂單數據進行匯總清洗,產出報表和大盤;財務中心需要通過訂單數據建立起實時的結算體系等。

      于是訂單中心逐步增加了對其他系統的數據同步的代碼,用以支持它們各自的需求。

      由于采用的是同步接口對接方式進行數據同步,因此訂單中心中不得不對新增的代碼添加異常處理代碼,防止因為下游服務異常而導致訂單中心自己的業務出現級聯影響。隨著業務逐步發展,下游系統也在進行改造,下游系統的數據同步接口一旦發生修改,比如增加字段、變更服務地址等,都需要通知訂單中心進行代碼修改,一時間訂單中心的研發人員苦不堪言。

      實際的業務場景中,訂單中心面臨的下游系統不只圖上的這些。在大型互聯網公司中,下游系統本身也是一個復雜的分布式系統,其中包含了數十上百的子服務。這樣一來,訂單中心需要對接的系統可能會達到數百上千個,甚至更多。這其中的調用復雜關系可想而知,代碼的復雜程度也難以想象,一個數據同步接口中動輒幾千行代碼不足為奇。

      長期發展下去,勢必會造成線上問題頻出,況且數據同步接口并非核心業務邏輯,但是卻需要投入大量的成本去維護,降低了上下游的迭代效率,還使得開發運維人員疲于奔命,對于個人亦或者企業而言都是不值得的。而這一切的根源都是因為采用了接口同步調用的方式去傳遞訂單數據。而同步調用本身就是一種強耦合的通信方式。

      那么改造的思路就顯而易見,就是想辦法優化數據傳輸方式,降低系統間的耦合度。改造思路就是通過使用消息隊列來實現系統之間的松耦合。

      上圖所示,訂單中心不再像之前那樣通過調用下游各個中心的訂單同步接口,而是直接將訂單同步消息發送到MQ消息隊列(如Kakfa、RocketMQ等)中的訂單同步Topic。下游的中心如果對訂單消息感興趣,則自行訂閱該Topic,拉取消息進行消費即可實現訂單同步。一旦不需要同步訂單消息,則下游的中心主動取消訂閱Topic即可,上游的訂單中心完全不需要感知下游如何去消費消息。而且一旦又有新的下游的服務也需要進行訂單同步,實現新的業務邏輯,則該中心只需要實現消費訂單消息的邏輯即可,直接與MQ消息隊列進行交互,同樣不需要上游的訂單消息進行感知。

      對于上游的訂單消息而言,它要做的就是專注于將訂單同步消息生產出來并發送(投遞)到MQ消息隊列,并保證消息發送成功。實現了與下游各種中心的松耦合,則在代碼量大幅度減少的同時,訂單中心的穩定性得到了提高。原先訂單中心需要保證訂單同步請求被下游的系統接收并處理完成才能繼續后續的操作。

      假設調用每個服務的平均響應時長是100ms(實際情況中可能會由于網絡擁塞變得時間更長,這里是一種理想化的情況),那么上圖中的8個中心同步數據的總時長為100ms × 8 = 800ms。而通過MQ消息隊列優化后的平均響應時長能夠降低數十毫秒,比如20ms。這是因為,消息投遞本身是一個高性能的操作,只要保證消息發送成功并被MQ消息隊列接收并持久化即可。后續的消費者對消息的消費過程,上游的發送者完全不用關注。

      即當訂單中心在邏輯0中完成發送訂單同步消息的操作后,就可以繼續執行邏輯1,邏輯2等后續的業務邏輯。

      對于整個鏈路而言,訂單同步過程就變成了一種異步的通信方式,這樣降低了上游業務邏輯執行耗時,解耦了上下游之間的交互過程,提升了系統的處理能力和吞吐量。對于用戶而言,等待時長也明顯變短,提升了用戶體驗。

      [實戰]使用消息隊列實現異步訂單同步

      下面通過一段代碼模擬改造后的場景,直觀地體驗一下如何通過消息隊列實現異步通信的目的。這里使用到的消息隊列是RocketMQ。關于RocketMQ的搭建和原理會在后續的章節中詳細展開,這里主要是展示具體的使用方法。

      ?

      定義一個訂單同步類,它的屬性是需要通過消息發送方式異步同步給下游各種中心.

      ?

      /***@classNameOrderInfo*@desc訂單同步實體*/publicclassOrderSyncInfo{/**訂單id*/privateStringorderId;/**訂單金額*/privateDoubleorderAmount;/**支付金額*/privateDoublepayAmount;/**優惠券id*/privatelongvoucherId;/**產品id*/privatelongproductId;/**商品名稱*/privateStringproductName;/**創建時間*/privateDatecreateTime;/**發貨時間*/privateDatedeliverTime;publicOrderSyncInfo(StringorderId,DoubleorderAmount,DoublepayAmount,longvoucherId,longproductId,StringproductName,DatecreateTime,DatedeliverTime){this.orderId=orderId;this.orderAmount=orderAmount;this.payAmount=payAmount;this.voucherId=voucherId;this.productId=productId;this.productName=productName;this.createTime=createTime;this.deliverTime=deliverTime;}省略gettersetter

      在上面的代碼中,通過構造方法傳遞對應的訂單屬性,即可實現對訂單同步對象的初始化。

      ?

      編寫一個訂單同步生產者類OrderInfoSyncProducer.java,用以發送訂單同步消息。

      ?

      /***@classNameOrderInfoSyncProducer*@desc訂單信息同步生產者*/publicclassOrderInfoSyncProducer{privateDefaultMQProducerproducer;publicOrderInfoSyncProducer(StringproducerGroup){//初始化生產者實例并指定生產者組producer=newDefaultMQProducer(producerGroup);//NameServer地址producer.setNamesrvAddr("192.168.1.108");try{producer.start();}catch(MQClientExceptione){thrownewRuntimeException("DefaultMQProduceriniterror!",e);}}/***發送消息*@paramorderSyncInfo*@return*/publicSendResultsend(OrderSyncInfoorderSyncInfo){//序列化訂單對象為字符串消息體StringmessageBody=JSON.toJSONString(orderSyncInfo);try{//構造Message對象Messagemessage=newMessage("order_info_sync_topic"/*Topic*/,"sync"/*Tag*/,messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));//發送消息SendResultsendResult=producer.send(message);returnsendResult;}catch(Exceptione){thrownewRuntimeException("sendorderInfosyncmessageerror!orderInfo:"+messageBody,e);}}/***關閉生產者*/publicvoidshutdown(){producer.shutdown();}}

      通過構造方法,傳遞了生產者組。一般生產者組都會帶業務屬性,方便維護;同時指定了NameServer地址,用于發現broker地址;提供了send()方法,用于將OrderSyncInfo對象序列化為JSON格式的消息體,通過Message對象包裝后指定要發送到消息隊列的Topic,通過producer的send()方法發送出去。
      ?

      編寫訂單同步消費者的代碼。不同的中心均可以通過該類實現對訂單的同步邏輯。

      ?

      /***@classNameOrderInfoSyncConsumer*@desc訂單信息同步消費者*/publicclassOrderInfoSyncConsumer{privateDefaultMQPushConsumerconsumer;privateStringconsumerGroup;publicOrderInfoSyncConsumer(StringconsumerGroup){this.consumerGroup=consumerGroup;//根據consumerGroup初始化DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer(consumerGroup);//集群消費模式consumer.setMessageModel(MessageModel.CLUSTERING);//從哪里開始消費,此處為從offset頭部開始consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//NameServer地址consumer.setNamesrvAddr("192.168.1.108");//訂閱主題try{consumer.subscribe("order_info_sync_topic","*");}catch(MQClientExceptione){thrownewRuntimeException("subscribetopicerror!",e);}//注冊消息消費回調,用以執行消費邏輯consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});}publicvoidstart(){try{consumer.start();}catch(MQClientExceptione){thrownewRuntimeException("startconsumererror!consumerGroup:"+consumerGroup,e);}System.out.printf("ConsumerStarted.consumerGroup:%s",consumerGroup);}}

      首先通過構造方法傳遞當前消費者實例的消費者組consumerGroup,通過consumerGroup對DefaultMQPushConsumer進行實例化;為DefaultMQPushConsumer設置各種屬性,包括:從何處開始消費、消費模式、NameServer地址、注冊消息消費回調接口(案例中通過匿名內部類實現默認的消費邏輯)、通過subscribe訂閱訂單同步主題;consumeMessage方法即為核心的消費業務邏輯;通過調用DefaultMQPushConsumer的start方法啟動消費者,并開始執行消費。

      最后看一下代碼具體是如何使用的。

      /***@classNameClient*@desc訂單同步測試類*/publicclassClient{publicstaticvoidmain(String[]args){//生產者發送訂單同步消息OrderInfoSyncProducerorderInfoSyncProducer=newOrderInfoSyncProducer("order_info_sync_group");OrderSyncInfoorderSyncInfo=newOrderSyncInfo("order_"+UUID.randomUUID().toString(),19.99,19.99,100001,200001,"IPhone11手機殼",newDate(System.currentTimeMillis()),newDate(System.currentTimeMillis()));//發送消息orderInfoSyncProducer.send(orderSyncInfo);//訂單中心消費訂單同步消息OrderInfoSyncConsumerorderCenterConsumer=newOrderInfoSyncConsumer("order_center_sync_group");orderCenterConsumer.start();//數據中心消費訂單同步消息OrderInfoSyncConsumerdataCenterConsumer=newOrderInfoSyncConsumer("data_center_sync_group");dataCenterConsumer.start();//倉儲中心消費訂單同步消息OrderInfoSyncConsumerstorageCenterConsumer=newOrderInfoSyncConsumer("storage_center_sync_group");storageCenterConsumer.start();}}

      下面對上一段代碼中的main()方法進行簡單的講解:

      初始化了生產組名為“order_info_sync_group”的訂單同步生產者實例。這樣在構造方法執行完成后會自行調用start啟動生產者;實例化了一個訂單同步對象。通過構造方法賦值屬性,可以看到該訂單同步實體為一個IPhone11手機殼。調用send()方法發送消息。實例化了orderCenterConsumer(訂單中心訂單同步消費者)、dataCenterConsumer(數據中心訂單同步消費者)、storageCenterConsumer(倉儲中心訂單同步消費者)等消費者實例。在構造方法初始化過程中,每個消費者組會分別訂閱訂單同步主題,并分別開啟消費,互不影響,這與RocketMQ消費消息的機制有關,此處不詳細展開。最后分別調用各自的start()方法開啟消費。

      通過代碼案例可以發現,下游的各種中心只要實現自己的OrderInfoSyncConsumer邏輯并開啟消費,就能夠實現在上游生產者不耦合下游消費者的前提下,完成跨多個系統的異步訂單同步功能。這就是消息隊列的解耦特性在實戰應用的體現。

      通過圖例分析、案例解析結合代碼,我們對消息隊列的異步通信方式建立起了一個感性的認知。

      下一篇中,我們將繼續對異步通信中的回調機制進行學習。敬請期待。

      關鍵詞: 異步通信 繼續執行 構造方法

      相關閱讀

      女省委书记被征服
      <listing id="5x1tx"></listing>
        <track id="5x1tx"></track><pre id="5x1tx"></pre>
        <listing id="5x1tx"><strike id="5x1tx"></strike></listing>

          <del id="5x1tx"></del>

          <noframes id="5x1tx">

          <big id="5x1tx"></big>