更新時(shí)間:2022-08-26 來(lái)源:黑馬程序員 瀏覽量:
背景
筆者以前在電商公司,我們需要在8月18號(hào)做大促活動(dòng),我們會(huì)提前一天給所有的用戶推送活動(dòng)信息,且需要根據(jù)用戶畫(huà)像生成不同的推送內(nèi)容。
當(dāng)時(shí)我們總共有80萬(wàn)用戶左右。
經(jīng)測(cè)試,通過(guò)Spring Task和分布式鎖,單臺(tái)機(jī)器同時(shí)開(kāi)啟5個(gè)線程,執(zhí)行時(shí)間需要27個(gè)小時(shí)左右,即便開(kāi)10個(gè)線程,需要14個(gè)小時(shí)左右,顯然執(zhí)行時(shí)間過(guò)長(zhǎng)。
解決方案
當(dāng)時(shí)個(gè)推服務(wù)部署節(jié)點(diǎn)有3臺(tái),在每年大促期間可動(dòng)態(tài)擴(kuò)容,其余的機(jī)器資源沒(méi)有充分利用起來(lái)。
要想短時(shí)間內(nèi)完成推送,那么就得想辦法讓每臺(tái)機(jī)器各自分一部分用戶數(shù)據(jù)去執(zhí)行,這樣效率可提高原來(lái)的N倍。
那么就需要分布式任務(wù)去執(zhí)行,核心思想如下圖:
經(jīng)過(guò)調(diào)研現(xiàn)有的開(kāi)源的分布式任務(wù)調(diào)度框架,決定在elastic-job和xxl-job中選一個(gè)
[Elastic Job](https://github.com/elasticjob)是當(dāng)當(dāng)網(wǎng)開(kāi)源一個(gè)分布式調(diào)度解決方案,由兩個(gè)相互獨(dú)立的子項(xiàng)目Elastic-Job-Lite和Elastic-Job-Cloud組成;定位為輕量級(jí)無(wú)中心化解決方案,使用 jar 包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)。支持分布式調(diào)度協(xié)調(diào)、彈性擴(kuò)容縮容、失效轉(zhuǎn)移、錯(cuò)過(guò)執(zhí)行作業(yè)重觸發(fā)、并行調(diào)度、自診斷和修復(fù)等等功能特性。
[XXL-Job官網(wǎng)](https://github.com/xuxueli/xxl-job)是大眾點(diǎn)評(píng)發(fā)布的分布式任務(wù)調(diào)度平臺(tái),其核心設(shè)計(jì)目標(biāo)是開(kāi)發(fā)迅速、學(xué)習(xí)簡(jiǎn)單、輕量級(jí)、易擴(kuò)展?,F(xiàn)已開(kāi)放源代碼并接入多家公司線上產(chǎn)品線,開(kāi)箱即用。
更傾向于選擇XXL-JOB:
1. 輕量級(jí),支持通過(guò)Web頁(yè)面對(duì)任務(wù)進(jìn)行動(dòng)態(tài)CRUD操作,操作簡(jiǎn)單
2. 只依賴數(shù)據(jù)庫(kù)作為集群注冊(cè)中心,接入開(kāi)發(fā)簡(jiǎn)單,不需要ZK
3. 高可用、解耦、高性能、監(jiān)控報(bào)警、分片、重試、故障轉(zhuǎn)移
4. 團(tuán)隊(duì)持續(xù)開(kāi)發(fā),社區(qū)活躍
5. 支持后臺(tái)直接查看每個(gè)任務(wù)執(zhí)行實(shí)時(shí)日志
具體實(shí)現(xiàn)
在項(xiàng)目中集成xxl-job客戶端
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.2.0</version> </dependency>
在配置文件中配置xxl-job信息
xxl: job: accessToken: admin: addresses: http://xxl部署IP地址:8080/xxl-job-admin executor: appname: vm-service address: ip: port: 9989 logretentiondays: 30
新增XxlJobConfig.java
package com.itheima.config; import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * xxl-job config * * @author xuxueli 2017-04-28 */ @Configuration @Slf4j public class XxlJobConfig { @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; // @Value("${xxl.job.executor.logpath}") // private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { log.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); //xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
在xxl-job中新增執(zhí)行器
注冊(cè)方式選自動(dòng)注冊(cè),這樣方便動(dòng)態(tài)擴(kuò)容
創(chuàng)建任務(wù)
路由策略選擇分片廣播
代碼部分
在任務(wù)代碼獲取推送用戶時(shí),根據(jù)當(dāng)前的分片及分片總數(shù)對(duì)用戶ID取余,這樣我們就可以在每個(gè)分片節(jié)點(diǎn),獲取不一樣的數(shù)據(jù)。id值越連續(xù),分片則越均勻。
ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo(); int numbers = shardingVo.getTotal(); //分片總數(shù) int index = shardingVo.getIndex(); //當(dāng)前分片索引
假設(shè)分片總數(shù)為3,當(dāng)前節(jié)點(diǎn)獲取到的分片索引為0,那么查詢推送用戶SQL如下:
SELECT user_id FROM `user_info` WHERE MOD(user_id,3)=0
注意:我們?cè)趯?shí)際代碼中,分片總數(shù)和當(dāng)前分片索引是以參數(shù)的形式傳給查詢的SQL語(yǔ)句的。
如上,即可完成分布式任務(wù)。
總結(jié)
在某些定時(shí)任務(wù)需要處理大量數(shù)據(jù)的情況下,我們可以通過(guò)引入分布式任務(wù)框架xxl-job,充分利用機(jī)器資源,將需要處理的數(shù)據(jù)均勻的分配到不同的機(jī)器上去執(zhí)行,提高任務(wù)執(zhí)行效率。