在一家傳媒公司做網(wǎng)站編輯 如何搜索引擎優(yōu)化的完整過(guò)程
分片技術(shù)方案
概述
XXL-JOB
并不直接提供數(shù)據(jù)處理的功能,它只會(huì)給所有注冊(cè)的執(zhí)行器分配好分片序號(hào),在向執(zhí)行器下發(fā)任務(wù)調(diào)度的同時(shí)攜帶分片總數(shù)和當(dāng)前分片序號(hào)
等參數(shù)
設(shè)計(jì)作業(yè)分片方案
保證多個(gè)執(zhí)行器之間不會(huì)查詢到重復(fù)的任務(wù),保證任務(wù)不會(huì)重復(fù)執(zhí)行
- 任務(wù)添加成功后,這些要處理的任務(wù)都會(huì)添加到
待處理任務(wù)表
中,然后啟動(dòng)的多個(gè)執(zhí)行器實(shí)例會(huì)去查詢并處理這些待處理任務(wù) - 每個(gè)執(zhí)行器從任務(wù)列表獲取任務(wù)時(shí)可以讓
任務(wù)id模上分片總數(shù)
,取余結(jié)果對(duì)應(yīng)需要執(zhí)行該任務(wù)執(zhí)行器的分片序號(hào)
,每個(gè)執(zhí)行器查詢的任務(wù)都是唯一的
任務(wù)冪等性
基于作業(yè)分片方案
可以保證每一個(gè)執(zhí)行器查詢到的待處理任務(wù)不會(huì)重復(fù),但對(duì)于同一個(gè)執(zhí)行器
并不能保證其不會(huì)重復(fù)處理其領(lǐng)取到的任務(wù)`
一個(gè)執(zhí)行器正在處理的調(diào)度任務(wù)還沒(méi)有完成時(shí),此時(shí)調(diào)度中心可能又下發(fā)了一次任務(wù)調(diào)度請(qǐng)求,此時(shí)為了保證執(zhí)行器不重復(fù)處理同一個(gè)任務(wù)需要進(jìn)行一些配置
策略 | 選項(xiàng) |
---|---|
調(diào)度過(guò)期策略,調(diào)度中心錯(cuò)過(guò)調(diào)度時(shí)間的補(bǔ)償處理策略 | 忽略 :調(diào)度過(guò)期后忽略過(guò)期的任務(wù),從當(dāng)前時(shí)間開始重新計(jì)算下次觸發(fā)時(shí)間立即執(zhí)行一次(可能重復(fù)執(zhí)行相同的任務(wù)) :調(diào)度過(guò)期后立即執(zhí)行一次,從當(dāng)前時(shí)間開始重新計(jì)算下次觸發(fā)時(shí)間 |
阻塞處理策略,調(diào)度過(guò)于密集即當(dāng)前執(zhí)行器正在執(zhí)行任務(wù)還沒(méi)有結(jié)束時(shí)來(lái)不及處理時(shí)的處理策略 | 單機(jī)串行(默認(rèn)) :調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,調(diào)度請(qǐng)求進(jìn)入FIFO隊(duì)列并以串行方式運(yùn)行丟棄后續(xù)調(diào)度 :調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),本次請(qǐng)求將會(huì)被丟棄并標(biāo)記為失敗覆蓋之前調(diào)度(可能重復(fù)執(zhí)行任務(wù)) :調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),將會(huì)終止運(yùn)行中的調(diào)度任務(wù)并清空隊(duì)列,然后運(yùn)行本地調(diào)度任務(wù) |
基于以上配置還是無(wú)法保同一個(gè)執(zhí)行器
不會(huì)重復(fù)執(zhí)行任務(wù),因?yàn)槲覀冸m然配置了忽略任務(wù),但等到下次觸發(fā)時(shí)間時(shí)可能還會(huì)執(zhí)行相同的任務(wù)
任務(wù)的冪等性
:對(duì)于數(shù)據(jù)的操作不論多少次最終結(jié)果始終是一致的,如處理視頻轉(zhuǎn)碼業(yè)務(wù)時(shí)不論任務(wù)調(diào)度多少次,同一個(gè)視頻只會(huì)執(zhí)行一次成功的轉(zhuǎn)碼
- 執(zhí)行過(guò)的任務(wù)可以打一個(gè)狀態(tài)標(biāo)記已完成,下次再次調(diào)度該任務(wù)時(shí)如果該任務(wù)已完成就不再執(zhí)行
冪等性
: 一次和多次請(qǐng)求某一個(gè)資源時(shí)對(duì)于資源(如視頻)
本身應(yīng)該具有同樣的結(jié)果,即使重復(fù)調(diào)度處理相同的任務(wù)也不能重復(fù)處理相同的視頻
-
場(chǎng)景
: 重復(fù)提交問(wèn)題,如惡意刷單,重復(fù)支付等問(wèn)題,如無(wú)論執(zhí)行添加語(yǔ)句多少次最終只會(huì)向數(shù)據(jù)庫(kù)中插入一條記錄 -
數(shù)據(jù)庫(kù)約束
:比如唯一索引,主鍵 -
樂(lè)觀鎖
:常用于數(shù)據(jù)庫(kù),更新數(shù)據(jù)時(shí)根據(jù)樂(lè)觀鎖狀態(tài)去更新 -
唯一序列號(hào)
:操作時(shí)傳遞一個(gè)唯一序列號(hào), 如在Redis中存儲(chǔ)一個(gè)序列號(hào)當(dāng)?shù)谝淮尾僮魍瓿珊缶蛣h除該序列號(hào),下回操作時(shí)由于獲取不到該序列號(hào)就無(wú)法操作
實(shí)現(xiàn)視頻處理的冪等性:執(zhí)行器接收調(diào)度請(qǐng)求去執(zhí)行視頻處理任務(wù)時(shí)需要先判斷該視頻是否處理完成
,如果處理中或處理成功
則不再處理
- 在數(shù)據(jù)庫(kù)
視頻處理表
中添加處理狀態(tài)
字段,視頻處理完成后更新status
字段的值,執(zhí)行器執(zhí)行任務(wù)前會(huì)先判斷視頻的處理狀態(tài) - 隨著任務(wù)的累計(jì),視頻處理表中的記錄可能會(huì)越來(lái)越多,此時(shí)我們可以將
處理成功的任務(wù)
轉(zhuǎn)移到任務(wù)處理歷史表(結(jié)構(gòu)一樣)
中,提高執(zhí)行器每次查詢?nèi)蝿?wù)的速度
分布式鎖
通過(guò)每個(gè)執(zhí)行器從任務(wù)列表獲取任務(wù)時(shí)讓任務(wù)id模上分片總數(shù)
,取余結(jié)果對(duì)應(yīng)需要執(zhí)行該任務(wù)執(zhí)行器的分片序號(hào),該方式理論上每個(gè)執(zhí)行器分到的任務(wù)是不重復(fù)的
由于任務(wù)調(diào)度中心支持執(zhí)行器彈性擴(kuò)容的機(jī)制,所以無(wú)法絕對(duì)避免任務(wù)不重復(fù)執(zhí)行,此時(shí)需要給每個(gè)任務(wù)配一把鎖,只有獲取到鎖的線程才能執(zhí)行任務(wù)
- 如原來(lái)有四個(gè)執(zhí)行器正在執(zhí)行任務(wù),此時(shí)
0、1號(hào)執(zhí)行器
正在執(zhí)行視頻處理任務(wù),但由于網(wǎng)絡(luò)問(wèn)題無(wú)法與調(diào)度中心通信,此時(shí)調(diào)度中心就會(huì)認(rèn)為執(zhí)行器個(gè)數(shù)減少了 - 調(diào)度中心就會(huì)對(duì)執(zhí)行器重新編號(hào),那么原來(lái)的
3、4執(zhí)行器
編號(hào)就會(huì)變成0、1
,他們就會(huì)查詢并執(zhí)行和0、1號(hào)執(zhí)行器相同的任務(wù)
同步鎖
:為了避免多線程去爭(zhēng)搶同一個(gè)任務(wù)可以使用synchronized
同步鎖去解決
- 缺點(diǎn):synchronized只能保證同一臺(tái)計(jì)算機(jī)中的多個(gè)線程去爭(zhēng)搶同一把鎖
synchronized(鎖對(duì)象){ // 執(zhí)行任務(wù)...
}
分布式鎖
:如果多個(gè)執(zhí)行器分布式部署即多臺(tái)計(jì)算機(jī),此時(shí)需要每臺(tái)計(jì)算機(jī)上的所有線程爭(zhēng)搶(共用)同一把鎖(分布式鎖),保證同一個(gè)視頻只有一個(gè)執(zhí)行器去處理
分布式鎖是由一個(gè)單獨(dú)的程序
提供加鎖、解鎖服務(wù),實(shí)現(xiàn)的方案有很多
-
基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)分布式鎖
:利用數(shù)據(jù)庫(kù)主鍵的唯一性或利用數(shù)據(jù)庫(kù)唯一索引、行級(jí)鎖的特點(diǎn)- 多個(gè)線程同時(shí)向數(shù)據(jù)庫(kù)表中插入一條主鍵相同的記錄,哪個(gè)線程插入成功就代表哪個(gè)線程獲取到鎖
- 多個(gè)線程同時(shí)去更新相同的記錄,誰(shuí)哪個(gè)線程更新成功就代表哪個(gè)線程搶到鎖
-
基于redis實(shí)現(xiàn)分布式鎖
: 基于setnx key value
和set key value nx命令
和redisson
框架等方案- 添加一個(gè)String類型的鍵值對(duì),前提是這個(gè)key不存在否則不執(zhí)行,多個(gè)線程設(shè)置同一個(gè)key只會(huì)有一個(gè)線程設(shè)置成功,設(shè)置成功的的線程拿到鎖
-
使用zookeeper實(shí)現(xiàn)分布式鎖(結(jié)構(gòu)類似文件目錄)
:多線程向zookeeper中創(chuàng)建一個(gè)子目錄(節(jié)點(diǎn))時(shí)只會(huì)有一個(gè)創(chuàng)建成功,誰(shuí)創(chuàng)建該結(jié)點(diǎn)成功誰(shuí)就 獲得鎖
操作視頻待處理任務(wù)
上傳視頻成功后向視頻待處理任務(wù)表(media_process)
添加視頻待處理任務(wù)記錄,上傳視頻和添加待處理任務(wù)
這兩個(gè)操作需要保證事務(wù)的一致性
添加待處理任務(wù)
上傳視頻成功后需要向視頻待處理任務(wù)表
添加視頻待處理任務(wù)記錄,這里暫時(shí)只處理avi格式
的視頻,對(duì)于其他格式的文件不會(huì)添加待處理任務(wù)記錄
- 因?yàn)樯蟼饕曨l成功后一定會(huì)將上傳文件的信息添加到
media_files
文件信息表,所以我們可以將添加文件信息和添加待處理任務(wù)記錄
的操作控制在一個(gè)事務(wù)中
視頻上傳完后在addMediaFilesToDb
方法中編寫addWaitingTask
方法添加待處理任務(wù),然后前后端測(cè)試上傳4個(gè)avi視頻,觀察待處理任務(wù)表是否存在任務(wù)記錄
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {// 從數(shù)據(jù)庫(kù)查詢文件MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);if (mediaFiles == null) {mediaFiles = new MediaFiles();// 拷貝基本信息BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);mediaFiles.setId(fileMd5);mediaFiles.setFileId(fileMd5);mediaFiles.setCompanyId(companyId);// 媒體類型mediaFiles.setUrl("/" + bucket + "/" + objectName);mediaFiles.setBucket(bucket);mediaFiles.setFilePath(objectName);mediaFiles.setCreateDate(LocalDateTime.now());mediaFiles.setAuditStatus("002003");mediaFiles.setStatus("1");// 保存上傳的文件信息到文件信息表int insert = mediaFilesMapper.insert(mediaFiles);if (insert < 0) {log.error("保存文件信息到數(shù)據(jù)庫(kù)失敗,{}", mediaFiles.toString());XueChengPlusException.cast("保存文件信息失敗");}// 添加待處理任務(wù)到待處理任務(wù)表addWaitingTask(mediaFiles);log.debug("保存文件信息到數(shù)據(jù)庫(kù)成功,{}", mediaFiles.toString());}return mediaFiles;}
/*** 添加待處理任務(wù)記錄* @param mediaFiles 媒資文件信息*/
private void addWaitingTask(MediaFiles mediaFiles){// 文件名稱String filename = mediaFiles.getFilename();// 文件擴(kuò)展名String extension = filename.substring(filename.lastIndexOf("."));// 文件mimeTypeString mimeType = getMimeType(extension);// 如果是avi視頻添加到視頻待處理表if(mimeType.equals("video/x-msvideo")){MediaProcess mediaProcess = new MediaProcess();BeanUtils.copyProperties(mediaFiles,mediaProcess);mediaProcess.setStatus("1");// 1表示未處理mediaProcess.setFailCount(0);// 失敗次數(shù)默認(rèn)為0// 設(shè)置url為nullmediaProcess.setUrl(null);int processInsert = mediaProcessMapper.insert(mediaProcess);if (processInsert <= 0) {XueChengPlusException.cast("保存avi視頻到待處理表失敗");}}
}
查詢待處理任務(wù)
在MediaProcessMapper
中編寫根據(jù)分片參數(shù)獲取待處理任務(wù)的DAO方法,保證各個(gè)執(zhí)行器查詢到的待處理任務(wù)記錄不重復(fù)
- 用
任務(wù)id
對(duì)分片總數(shù)
取模
,如果等于該執(zhí)行器的分片序號(hào)
則執(zhí)行 - 同時(shí)為了避免同一個(gè)任務(wù)被同一個(gè)執(zhí)行器執(zhí)行兩次,我們需要額外指定任務(wù)狀態(tài)為
未處理(status = 1)
或處理失敗但處理次數(shù)小于3
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {/*** @description 根據(jù)分片參數(shù)獲取待處理任務(wù)* @param shardTotal 分片總數(shù)* @param shardindex 分片序號(hào)* @param count 任務(wù)數(shù)*/@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}")List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal,@Param("shardIndex") int shardIndex,@Param("count") int count);
}
編寫MediaFileProcessService
接口及其實(shí)現(xiàn)類查詢待處理任務(wù)表
中的的待處理任務(wù),指定分片參數(shù)
和獲取記錄數(shù)(不能超過(guò)cpu核心數(shù))
public interface MediaFileProcessService {/*** @description 獲取待處理任務(wù)* @param shardIndex 分片序號(hào)* @param shardTotal 分片總數(shù)* @param count 獲取記錄數(shù)* @return 待處理任務(wù)集合*/public List<MediaProcess> getMediaProcessList(int shardIndex,int shardTotal,int count);
}
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaProcessMapper mediaProcessMapper;@Overridepublic List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {List<MediaProcess> mediaProcesses = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);return mediaProcesses;}
}
基于數(shù)據(jù)庫(kù)方式實(shí)現(xiàn)分布鎖
當(dāng)一個(gè)線程開始執(zhí)行視頻處理任務(wù)時(shí)將任務(wù)記錄的status
字段的值更新為4表示處理中
悲觀鎖
: 悲觀鎖比較適合插入數(shù)據(jù),簡(jiǎn)單粗暴但是性能一般樂(lè)觀鎖
: 比較適合更新數(shù)據(jù), 性能好但是成功率低(多個(gè)線程同時(shí)執(zhí)行時(shí)只有一個(gè)可以執(zhí)行成功),還需要訪問(wèn)數(shù)據(jù)庫(kù)造成數(shù)據(jù)庫(kù)壓力過(guò)大
# 多個(gè)線程去執(zhí)行該sql都將會(huì)執(zhí)行成功update media_process m set m.status='4' where m.id=?# 版本號(hào)法,在表中增加一個(gè)version字段,更新時(shí)判斷是否等于某個(gè)版本,等于則更新否則更新失敗update t1 set t1.data1 = '',t1.version='2' where t1.version='1'# 自定義版本號(hào)字段status,多個(gè)線程執(zhí)行該SQL時(shí)只有一個(gè)線程成功執(zhí)行,2表示處理成功不用查詢update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?# 更新失敗重試,嘗試增加版本號(hào)字段的值update t1 set t1.count = count+1,t1.version='2' where t1.version='1'update t1 set t1.count = count+1,t1.version='3' where t1.version='2'
在MediaProcessMapper
中定義方法,基于樂(lè)觀鎖的原理實(shí)現(xiàn)分布式鎖,保證最終只有一個(gè)線程可以成功執(zhí)行SQL即獲取到鎖
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {/*** 開啟一個(gè)任務(wù),只要搶到鎖的線程才能開啟任務(wù)* @param id 任務(wù)id* @return 更新記錄數(shù)*/@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")int startTask(@Param("id") long id);
}
編寫MediaFileProcessService
接口及其實(shí)現(xiàn)類,開啟一個(gè)任務(wù),只有搶到鎖的線程才可以成功開啟任務(wù)
/*** 開啟一個(gè)任務(wù)* @param id 任務(wù)id* @return true開啟任務(wù)成功,false開啟任務(wù)失敗*/
public boolean startTask(long id);
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaProcessMapper mediaProcessMapper;public boolean startTask(long id) {int result = mediaProcessMapper.startTask(id);return result<=0?false:true;}
}
更新待處理任務(wù)結(jié)果
任務(wù)處理完成需要更新待處理任務(wù)表
中status
字段的值,如果任務(wù)執(zhí)行成功還需要更新視頻的URL,將待處理任務(wù)記錄從表中刪除,同時(shí)向歷史任務(wù)表添加記錄
/*** @description 保存任務(wù)結(jié)果* @param taskId 任務(wù)id* @param status 任務(wù)狀態(tài)* @param fileId 文件id* @param url url 文件可訪問(wèn)的url* @param errorMsg 錯(cuò)誤信息*/
void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;@AutowiredMediaProcessHistoryMapper mediaProcessHistoryMapper;@Transactional@Overridepublic void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {// 查出待處理任務(wù),如果不存在則直接返回MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);if(mediaProcess == null){return ;}// 任務(wù)處理失敗,更新任務(wù)處理結(jié)果LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);if(status.equals("3")){MediaProcess mediaProcess_u = new MediaProcess();mediaProcess_u.setStatus("3");mediaProcess_u.setErrormsg(errorMsg);mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);// 根據(jù)Id更新任務(wù)處理結(jié)果mediaProcessMapper.update(mediaProcess_u,queryWrapperById);log.debug("更新任務(wù)處理狀態(tài)為失敗,任務(wù)信息:{}",mediaProcess_u);return ;}// 任務(wù)處理成功MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);if(mediaFiles!=null){// 更新文件信息表中訪url字段mediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}// 更新待處理任務(wù)表的url和狀態(tài)mediaProcess.setUrl(url);mediaProcess.setStatus("2");mediaProcess.setFinishDate(LocalDateTime.now());mediaProcessMapper.updateById(mediaProcess);// 添加到歷史任務(wù)記錄表MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);// 從待處理任務(wù)表中刪除處理成功的任務(wù)mediaProcessMapper.deleteById(mediaProcess.getId());}
}
視頻轉(zhuǎn)碼處理
視頻上傳成功需要對(duì)視頻格式進(jìn)行處理,這里我們需要使用Java程序?qū)σ曨l進(jìn)行處理
視頻編碼
文件格式
: mp4、.avi、rmvb
等這些不同擴(kuò)展名的視頻文件的文件格式
編碼格式
: 視頻文件的內(nèi)容主要包括視頻和音頻,它們都會(huì)按照一定的編碼格式去編碼,播放器播放音視頻時(shí)需要根據(jù)它們的封裝格式去提取出編碼并解析
音視頻編碼格式
:通過(guò)音視頻的壓縮技術(shù)可以將原始視頻格式的文件轉(zhuǎn)換成另一種視頻格式的文件,即將視頻的編碼格式轉(zhuǎn)換成另一種編碼格式,目前最常用的編碼標(biāo)準(zhǔn)是視頻H.264,音頻AAC
MPEG系列視頻編碼
: Mpeg1(vcd),Mpeg2(DVD),Mpeg4(divx,xvid),Mpeg4 AVC(熱門)等音頻編碼
: MPEG Audio Layer 1/2、MPEG Audio Layer 3(mp3)、MPEG-2 AAC 、MPEG-4 AAC等H.26X系列視頻編碼
: H.261、H.262、H.263、H.263+、H.263++、H.264(MPEG4 AVC合作的結(jié)晶)
FFmpeg
視頻錄制完成后需要使用視頻編碼軟件對(duì)視頻進(jìn)行編碼如FFmpeg,將ffmpeg.exe
加入環(huán)境變量Path中后執(zhí)行ffmpeg -version
測(cè)試,詳情參考文檔
ffmpeg.exe -i 1.avi 1.mp4/mp3/gif
將一個(gè).avi
文件轉(zhuǎn)成mp4、mp3、gif
等文件
視頻處理工具類
測(cè)試使用java.lang.ProcessBuilder
執(zhí)行Windows命令
ProcessBuilder builder = new ProcessBuilder();
builder.command("C:\\Program Files (x86)\\Tencent\\QQ\\Bin\\QQScLauncher.exe");
// 將標(biāo)準(zhǔn)輸入流和錯(cuò)誤輸入流合并,通過(guò)標(biāo)準(zhǔn)輸入流程讀取信息
builder.redirectErrorStream(true);
// 執(zhí)行命令
Process p = builder.start();
在base工程的util包下創(chuàng)建Mp4VideoUtil
類是用于將視頻轉(zhuǎn)為mp4格式,使用Java程序調(diào)用ffmpeg.exe
命令將avi格式的視頻轉(zhuǎn)成mp4格式的文件
public static void main(String[] args) throws IOException {// ffmpeg.exe命令的位置String ffmpeg_path = "D:\\soft\\ffmpeg\\ffmpeg.exe";// 源avi視頻的路徑String video_path = "D:\\develop\\bigfile_test\\nacos01.avi";// 轉(zhuǎn)換后mp4文件的名稱String mp4_name = "nacos01.mp4";// 轉(zhuǎn)換后mp4文件的路徑String mp4_path = "D:\\develop\\bigfile_test\\nacos01.mp4";// 創(chuàng)建工具類對(duì)象Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);// 開始視頻轉(zhuǎn)換,成功將返回successString s = videoUtil.generateMp4();System.out.println(s);
}
public class Mp4VideoUtil extends VideoUtil {String ffmpeg_path;String video_path;String mp4_name;String mp4folder_path;public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){super(ffmpeg_path);this.ffmpeg_path = ffmpeg_path;this.video_path = video_path;this.mp4_name = mp4_name;this.mp4folder_path = mp4folder_path;}// 清除已生成的mp4private void clear_mp4(String mp4_path){// 刪除原來(lái)已經(jīng)生成的m3u8及ts文件File mp4File = new File(mp4_path);if(mp4File.exists() && mp4File.isFile()){mp4File.delete();}}/*** 將視頻編碼生成對(duì)應(yīng)的mp4文件* @return 成功返回success,失敗返回控制臺(tái)日志*/public String generateMp4(){// 清除已生成的mp4clear_mp4(mp4folder_path);// 拼接命令ffmpeg.exe -i lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4List<String> commend = new ArrayList<String>();commend.add(ffmpeg_path);commend.add("-i");commend.add(video_path);commend.add("-c:v");commend.add("libx264");commend.add("-y");//覆蓋輸出文件commend.add("-s");commend.add("1280x720");commend.add("-pix_fmt");commend.add("yuv420p");commend.add("-b:a");commend.add("63k");commend.add("-b:v");commend.add("753k");commend.add("-r");commend.add("18");commend.add(mp4folder_path);String outstring = null;// 使用Java程序調(diào)用`ffmpeg.exe`命令將avi格式的視頻轉(zhuǎn)成mp4格式的文件try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);// 將標(biāo)準(zhǔn)輸入流和錯(cuò)誤輸入流合并,通過(guò)標(biāo)準(zhǔn)輸入流程讀取信息builder.redirectErrorStream(true);Process p = builder.start();outstring = waitFor(p);} catch (Exception ex) {ex.printStackTrace();}Boolean check_video_time = this.check_video_time(video_path, mp4folder_path);if(!check_video_time){return outstring;}else{return "success";}}
}
視頻處理任務(wù)類
定義任務(wù)類VideoTask
編寫任務(wù)的邏輯代碼
并發(fā)處理
: 即每個(gè)視頻使用一個(gè)線程去處理,所以每次處理的視頻數(shù)量不要超過(guò)計(jì)算機(jī)的cpu核心數(shù)異步執(zhí)行任務(wù)
: 由于線程需要執(zhí)行的具體任務(wù)是在后臺(tái)異步執(zhí)行的,所以線程池啟動(dòng)多個(gè)線程的動(dòng)作瞬間完成的即我們定義的任務(wù)方法也會(huì)立刻完成,此時(shí)我們就需要設(shè)置一個(gè)計(jì)數(shù)器,保證所有線程都執(zhí)行完任務(wù)后程序才會(huì)往下執(zhí)行超時(shí)設(shè)置
: 線程阻塞時(shí)還要設(shè)置一個(gè)超時(shí)時(shí)間,防止程序出現(xiàn)未知異常(斷電),此時(shí)線程沒(méi)有執(zhí)行計(jì)數(shù)器減一的操作會(huì)導(dǎo)致其他線程無(wú)限期等待
@Slf4j
@Component
public class VideoTask {@AutowiredMediaFileService mediaFileService;@AutowiredMediaFileProcessService mediaFileProcessService;// ffmpeg.exe程序的位置@Value("${videoprocess.ffmpegpath}")String ffmpegpath;@XxlJob("videoJobHandler")public void videoJobHandler() throws Exception {// 分片參數(shù)int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();List<MediaProcess> mediaProcessList = null;int size = 0;try {// 取出cpu核心數(shù)作為一次查詢視頻處理任務(wù)的最大數(shù)量int processors = Runtime.getRuntime().availableProcessors();mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);// 實(shí)際查詢的任務(wù)數(shù)量size = mediaProcessList.size();log.debug("取出待處理視頻任務(wù){(diào)}條", size);if (size <= 0) {return;}} catch (Exception e) {e.printStackTrace();return;}// 創(chuàng)建一個(gè)包含size個(gè)線程的線程池,將來(lái)每一個(gè)線程對(duì)應(yīng)一個(gè)視頻處理任務(wù)ExecutorService threadPool = Executors.newFixedThreadPool(size);// 線程計(jì)數(shù)器,初始值就是我們的線程總數(shù),每當(dāng)一個(gè)線程執(zhí)行完后該值會(huì)減1CountDownLatch countDownLatch = new CountDownLatch(size);// 將待處理任務(wù)加入線程池mediaProcessList.forEach(mediaProcess -> {threadPool.execute(() -> {try {// 任務(wù)idLong taskId = mediaProcess.getId();// 各個(gè)線程基于樂(lè)觀鎖的原理開始搶任務(wù),只有獲取到鎖的線程才可以開啟任務(wù)boolean b = mediaFileProcessService.startTask(taskId);if (!b) {log.debug("搶占任務(wù)失敗,任務(wù)id:{}",taskId);return;}log.debug("開始執(zhí)行任務(wù):{}", mediaProcess);// 線程搶到任務(wù)后開始處理,根據(jù)待處理任務(wù)中包含的視頻文件信息,將其從Minio下載到本地服務(wù)器上String bucket = mediaProcess.getBucket();String filePath = mediaProcess.getFilePath();// objectNameString fileId = mediaProcess.getFileId();String filename = mediaProcess.getFilename();File originalFile = mediaFileService.downloadFileFromMinIO(mediaProcess.getBucket(), mediaProcess.getFilePath());if (originalFile == null) {log.debug("下載待處理文件失敗,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));// 保存任務(wù)處理失敗的結(jié)果mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下載待處理文件失敗");return;}// 下載成功后開始進(jìn)行轉(zhuǎn)碼// 創(chuàng)建臨時(shí)文件作為轉(zhuǎn)換后的文件File mp4File = null;try {mp4File = File.createTempFile("mp4", ".mp4");} catch (IOException e) {log.error("創(chuàng)建mp4臨時(shí)文件失敗");// 保存任務(wù)處理失敗的結(jié)果mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "創(chuàng)建mp4臨時(shí)文件失敗");return;}// 利用工具類對(duì)視頻進(jìn)行轉(zhuǎn)碼try {// 指定程序位置,源avi視頻文件路徑,轉(zhuǎn)碼后的文件名稱,轉(zhuǎn)碼后的文件路徑
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());// 開始視頻轉(zhuǎn)換,成功將返回successString result = videoUtil.generateMp4();} catch (Exception e) {e.printStackTrace();log.error("處理視頻文件:{},出錯(cuò):{}", mediaProcess.getFilePath(), e.getMessage());}if (!result.equals("success")) {log.error("處理視頻失敗,視頻地址:{},錯(cuò)誤信息:{}", bucket + filePath, result);// 保存任務(wù)處理失敗的結(jié)果mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);return;}// 指定轉(zhuǎn)碼后的視頻在Minio中的存儲(chǔ)路徑,將轉(zhuǎn)碼后生成的視頻上傳至minioString objectName = getFilePath(fileId, ".mp4");// 保存視頻可訪問(wèn)的urlString url = "/" + bucket + "/" + objectName;try {mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);// 任務(wù)處理成功,將url保存到文件信息表并更新狀態(tài)為成功,同時(shí)將處理成功的任務(wù)記錄刪除并存入歷史任務(wù)表mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);} catch (Exception e) {log.error("上傳視頻失敗或入庫(kù)失敗,視頻地址:{},錯(cuò)誤信息:{}", bucket + objectName, e.getMessage());// 保存任務(wù)處理失敗的結(jié)果mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "處理后視頻上傳或入庫(kù)失敗");}}finally {// 保證當(dāng)前線程完成任務(wù)后將計(jì)數(shù)器的值減1,這行代碼一定會(huì)執(zhí)行countDownLatch.countDown();}});});// 阻塞即當(dāng)所有線程都完成任務(wù)后程序才會(huì)下執(zhí)行,此時(shí)需要設(shè)置線程的最大等待時(shí)間防止無(wú)限期等待countDownLatch.await(30, TimeUnit.MINUTES);}// 獲取文件在Minio中完整的存儲(chǔ)路徑private String getFilePath(String fileMd5,String fileExt){return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;}
}