中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

佛山做公司網(wǎng)站地推項目平臺

佛山做公司網(wǎng)站,地推項目平臺,網(wǎng)站建設(shè) 職責(zé),北京vi設(shè)計方案背景 當(dāng)項目有很多數(shù)據(jù)源的時候,通常會在啟動的時候就把數(shù)據(jù)源連接加載緩存上,當(dāng)數(shù)據(jù)源進(jìn)行變更后如何自動實時將緩存的數(shù)據(jù)源進(jìn)行更新呢?如果是單個項目直接調(diào)接口方法就行了,但是涉及到分布式多個系統(tǒng)呢? 解決方案…

背景

當(dāng)項目有很多數(shù)據(jù)源的時候,通常會在啟動的時候就把數(shù)據(jù)源連接加載緩存上,當(dāng)數(shù)據(jù)源進(jìn)行變更后如何自動實時將緩存的數(shù)據(jù)源進(jìn)行更新呢?如果是單個項目直接調(diào)接口方法就行了,但是涉及到分布式多個系統(tǒng)呢?

解決方案:

使用Redis輕量級消息隊列,它可以實現(xiàn)實時通知,實時狀態(tài)更新等功能,配合AOP實現(xiàn)自動更新數(shù)據(jù)源狀態(tài)。

下面結(jié)合代碼寫一個使用示例:

1.首先創(chuàng)建數(shù)據(jù)源對象

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/**** @author ws* @since 2022-08-12*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
@TableName("ed_datasource_info")
public class DatasourceInfo implements Serializable {private static final long serialVersionUID = 1L;@TableId(value = "id", type = IdType.AUTO)private Integer id;/*** 數(shù)據(jù)源編碼*/@TableField("datasource_code")private String datasourceCode;/*** 數(shù)據(jù)源名稱*/@TableField("datasource_name")private String datasourceName;/*** 數(shù)據(jù)源類型*/@TableField("datasource_type")private String datasourceType;/*** 類型 0:數(shù)據(jù)庫 1:Rest-api*/@TableField("type")private Integer type;/*** 創(chuàng)建人*/@TableField("creator")private String creator;/*** 模式*/@TableField("schema_name")private String schemaName;@TableField("create_time")private Date createTime;@TableField("update_time")private Date updateTime;/*** 數(shù)據(jù)源連接信息*/@TableField("link_json")private String linkJson;}

2.初始化啟動加載數(shù)據(jù)源

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.sztech.common.constant.DataSourceTypeEnum;
import com.sztech.entity.DatasourceInfo;
import com.sztech.service.DatasourceInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class DataSourceRecovery implements InitializingBean {@Resourceprivate DatasourceInfoService datasourceInfoService;@Overridepublic void afterPropertiesSet() throws Exception {refresh();}private void refresh() throws Exception{this.refresh(null);}public void refresh(String sourceCode){QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();queryWrapper.eq("type", DataSourceTypeEnum.DB.getKey());if(StringUtils.isNotBlank(sourceCode)){queryWrapper.eq("datasource_code",sourceCode);}List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);if(CollectionUtils.isEmpty(list)){return;}CountDownLatch countDownLatch = new CountDownLatch(list.size());for(DatasourceInfo datasourceInfo : list){new Thread(new ReadloadThread(datasourceInfo, countDownLatch)).start();}try {countDownLatch.await(1,TimeUnit.MINUTES);} catch (InterruptedException e) {log.error("數(shù)據(jù)源加載等待超時",e);}}/*** 多線程加載數(shù)據(jù)源,提高啟動速度*/static class ReadloadThread implements Runnable {private DatasourceInfo datasourceInfo;private CountDownLatch countDownLatch;public ReadloadThread() {}public ReadloadThread(DatasourceInfo datasourceInfo,CountDownLatch countDownLatch) {this.datasourceInfo = datasourceInfo;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {DataSourceContext.setClientMap(datasourceInfo);DataSourceContext.setConfigMap(datasourceInfo.getDatasourceCode(),datasourceInfo);}catch (Exception e){log.error("datasource:{},加載失敗",datasourceInfo.getDatasourceCode(),e);}finally {countDownLatch.countDown();}}}
}

3.創(chuàng)建DataSourceContext,用于數(shù)據(jù)源緩存數(shù)據(jù)源連接

import com.sztech.core.tool.DBTool;
import com.sztech.entity.DatasourceInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** User: wangsheng* Date: 2022-02-11* Time: 14:05*/
public class DataSourceContext {/*** 客戶端緩存*/private final static Map<String, IClient> clientMap = new ConcurrentHashMap<>();/*** 數(shù)據(jù)源配置緩存*/private final static Map<String, DatasourceInfo> configMap = new ConcurrentHashMap<>();public static void setClientMap(DatasourceInfo datasourceInfo) {if(clientMap.containsKey(datasourceInfo.getDatasourceCode())){try {clientMap.get(datasourceInfo.getDatasourceCode()).close();}catch (Exception ignored){}}clientMap.put(datasourceInfo.getDatasourceCode(),DBTool.buildClient(datasourceInfo));}public static void setConfigMap(String key, DatasourceInfo datasourceInfo) {configMap.put(key, datasourceInfo);}public static void removeClientMap(String key) {if(clientMap.containsKey(key)){try {clientMap.get(key).close();}catch (Exception ignored){}}clientMap.remove(key);}public static void removeConfigMap(String key) {configMap.remove(key);}public static IClient getClientMap(String key) {IClient client = clientMap.get(key);if(null == client){throw new RuntimeException(String.format("數(shù)據(jù)源編碼:[%s]不存在或被刪除...", key));}return client;}public static DatasourceInfo getConfigMap(String key) {DatasourceInfo datasourceInfo = configMap.get(key);if(null == datasourceInfo){throw new RuntimeException(String.format("數(shù)據(jù)源編碼:[%s]不存在或被刪除...", key));}return datasourceInfo;}
}
package com.sztech.core.tool;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.Instance;
import com.sztech.common.constant.ResultEnum;
import com.sztech.common.exception.BizException;
import com.sztech.common.utils.ReflectionUtils;
import com.sztech.common.utils.SpringUtils;
import com.sztech.common.utils.ThreadPoolUtil;
import com.sztech.core.datasource.DataSourceContext;
import com.sztech.core.datasource.IClient;
import com.sztech.core.datasource.rdbms.RdbmsConfig;
import com.sztech.entity.*;
import com.sztech.pojo.dto.ColumnDto;
import com.sztech.pojo.dto.QueryTableDto;
import com.sztech.pojo.dto.TableDto;
import com.sztech.pojo.node.PartitionColumn;
import com.sztech.pojo.vo.*;
import com.sztech.service.CreateTableLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;/*** Description:* User: wangsheng* Date: 2022-08-12* Time: 16:59*/
@Slf4j
public class DBTool {/*** 建立客戶端*/public static IClient buildClient(DatasourceInfo datasourceInfo) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);return client.open(datasourceInfo);}/*** 測試數(shù)據(jù)源** @return*/public static boolean testSource(DatasourceInfo datasourceInfo) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);return client.testSource(datasourceInfo);}public static List<String> getSchemas(DatasourceInfo datasourceInfo) {List<String> schemas = new ArrayList<>();Connection conn = null;try {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);Class.forName(client.driverName());String linkJson = datasourceInfo.getLinkJson();RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);conn = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getDecodePassword());DatabaseMetaData metadata = conn.getMetaData();try (ResultSet resultSet = metadata.getSchemas()) {while (resultSet.next()) {String schemaName = resultSet.getString("TABLE_SCHEM");schemas.add(schemaName);}}} catch (SQLException e) {throw new RuntimeException(e);} catch (ClassNotFoundException e) {throw new RuntimeException(e);} finally {if (conn != null) {try {conn.close();} catch (SQLException ex) {ex.printStackTrace();}}}return schemas;}/*** 獲取驅(qū)動名稱*/public static String getDriverName(String datasourceType) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceType, "type", IClient.class);return client.driverName();}/*** 獲取表中列信息*/public static List<ColumnDto> getColumns(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getColumns(tableName);}/*** 獲取表中分區(qū)列信息*/public static List<String> getPartitionColumns(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getPartitionColumns(tableName);}/*** 獲取表信息*/public static List<String> getTableNames(String datasourceCode, String tableNameLike) {return DataSourceContext.getClientMap(datasourceCode).getTableNames(tableNameLike);}/*** 獲取表信息*/public static List<TableDto> getTables(String datasourceCode) {return DataSourceContext.getClientMap(datasourceCode).getTables();}/*** 獲取單個表信息*/public static TableDto getTableByName(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableByName(tableName);}/*** 獲取單個表信息(創(chuàng)建時間,字段數(shù))*/public static TableDto getTableField(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableField(tableName);}/*** 獲取表信息(獲取創(chuàng)建時間)** @param dto* @return*/public static TableInfoVo getTableData(QueryTableDto dto) {IClient client = DataSourceContext.getClientMap(dto.getDataSourceCode());return client.getTableInfo(dto.getTableName());}/*** 根據(jù)字段type建表*/public static void createTableByColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSql(columnDtos, tableName, true);log.info("執(zhí)行建表語句為:" + JSON.toJSONString(sqls));sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));}/*** 根據(jù)字段type建表*/public static void createTableByNotTransformedColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSql(columnDtos, tableName, false);log.info("執(zhí)行建表語句為:" + JSON.toJSONString(sqls));sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));}/*** 創(chuàng)建索引* 注: oracle 索引名在整個庫里必須唯一 否則建立失敗** @param datasourceCode 數(shù)據(jù)源編碼* @param tableName      表名* @param filedNames     filed1,filed2...* @param unique         唯一*/public static void createIndex(String datasourceCode, String tableName, String filedNames, Boolean unique) {DataSourceContext.getClientMap(datasourceCode).createIndex(tableName, filedNames, unique);}/*** sql校驗** @param datasourceCode* @param sql* @param sourceType* @return*/public static Map<String, Object> checkSql(String datasourceCode, String sql, String sourceType) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.checkSql(sql, sourceType);}/*** 根據(jù)sql創(chuàng)建表** @param datasourceCode* @param sql*/public static void createTableWithSql(String datasourceCode, String sql) {IClient client = DataSourceContext.getClientMap(datasourceCode);log.info("執(zhí)行建表語句為:" + JSON.toJSONString(sql));client.executeCommandSyn(sql, new HashMap<>());
//        DataSourceContext.getClientMap(datasourceCode).createTableWithSql(sql);}/*** 刪除表** @param datasourceCode* @param tableName*/public static void dropTable(String datasourceCode, String tableName) {DataSourceContext.getClientMap(datasourceCode).dropTable(tableName);}/*** 單表查詢數(shù)據(jù)*/public static List<Map<String, Object>> selectDataFromTable(String datasourceCode, List<DataTableColumn> columns, String tableName, String search, Integer limit) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 獲取查詢語句String selectSql = client.getSelectSql(columns, tableName, search, limit);log.info("執(zhí)行語句:" + selectSql);return client.selectDataFromTable(selectSql, null);}/*** 單表查詢數(shù)據(jù)*/public static List<Map<String, Object>> selectFromTable(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 獲取查詢語句String selectSql = client.getFormSelectSql(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("執(zhí)行語句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 單表查詢數(shù)據(jù)*/public static List<Map<String, Object>> selectFromForBackUp(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 獲取查詢語句String selectSql = client.selectFromForBackUp(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("執(zhí)行語句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 單表查詢數(shù)據(jù)*/public static List<Map<String, Object>> selectFromFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 獲取查詢語句String selectSql = client.getFormSelectSqlForFile(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("執(zhí)行語句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 查詢單表是否存在文件名*/public static List<Map<String, Object>> getExistOldName(String datasourceCode, String tableName, String search) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 獲取查詢語句String selectSql = client.getExistOldName( tableName, search);log.info("執(zhí)行語句:" + selectSql);return client.selectDataFromTable(selectSql, null);}/*** 單表查詢數(shù)據(jù)(查詢歸集表專門使用)*/public static List<Map<String, Object>> selectCollectTable(CollectConditionVo vo) {IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());// 獲取查詢語句String selectSql = client.getCollectTable(vo);log.info("執(zhí)行語句:" + selectSql);return client.selectDataFromTable(selectSql, vo.getParams());}/*** 單表查詢數(shù)據(jù)量*/public static Map<String, Object> getFormCount(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 獲取查詢語句String selectSql = client.getCountSql(columns, searchColumns, tableName, search, params);log.info("執(zhí)行語句:" + selectSql);return client.getCount(selectSql, params);}/*** 查詢區(qū)縣庫表的數(shù)據(jù)量*/public static Map<String, Object> getCountryCount(String datasourceCode, String tableName,  MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 獲取查詢語句String selectSql ="select count(1) as count from "+tableName;log.info("執(zhí)行語句:" + selectSql);return client.getCount(selectSql, params);}public static Map<String, Object> getFormCountForFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 獲取查詢語句String selectSql = client.getCountSqlForFile(columns, searchColumns, tableName, search, params);log.info("執(zhí)行語句:" + selectSql);return client.getCount(selectSql, params);}/*** 查詢表數(shù)據(jù)量*/public static Long getTableRows(String datasourceCode, String tableName) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getTableRows(tableName);}/*** 查詢表對應(yīng)分區(qū)數(shù)據(jù)量*/public static Long getTablePartitionRows(String datasourceCode, String tableName, List<PartitionColumn> partitionColumns) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getTablePartitionRows(tableName, partitionColumns);}/*** 查詢表數(shù)據(jù)量*/public static Integer getTablePhysicalSize(String datasourceCode, String tableName) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getPhysicalSize(tableName);}/*** 獲取表最大值** @param datasourceCode 數(shù)據(jù)源編碼* @param tableName      表名* @param incColumnName  自增列名* @return {@link Integer}*/public static Object getMaxValue(String datasourceCode, String tableName, String incColumnName, String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxValue(tableName, incColumnName, condition);}public static Object getMaxValue(String datasourceCode, String schema, String tableName, String incColumnName, String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxValue(schema, tableName, incColumnName, condition);}public static Object getMaxTime(String datasourceCode, String schema, String tableName, String incColumnName, String tongId,String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxTime(schema, tableName, incColumnName,tongId, condition);}/*** 字段存在** @param datasourceCode 數(shù)據(jù)源編碼* @param tableName      表名* @param fieldName      字段名* @return {@link Boolean}*/public static Boolean fieldExist(String datasourceCode, String tableName, String fieldName) {List<ColumnDto> columns = getColumns(datasourceCode, tableName);return columns.stream().anyMatch(s -> s.getName().equalsIgnoreCase(fieldName));}/*** 數(shù)據(jù)預(yù)覽 獲取前十條** @return*/public static String dataView(String datasourceCode, String tableName, String condition) {return DataSourceContext.getClientMap(datasourceCode).dataView(tableName, condition);}/*** 創(chuàng)建分區(qū)臨時表* odps適用*/public static void createPartitionedTableByColumns(List<ColumnDto> columnDtos, String tableName, String tableComment, String partitionedField, String datasourceCode) {DataSourceContext.getClientMap(datasourceCode).createPartitionedTableByColumns(columnDtos, tableName, tableComment, partitionedField);}/*** 同步執(zhí)行命令*/public static void executeCommandSyn(String datasourceCode, String command, Map<String, Object> params) {DataSourceContext.getClientMap(datasourceCode).executeCommandSyn(command, params);}/*** 異步執(zhí)行命令* odps適用*/public static Instance executeCommandASyn(String datasourceCode, String command, Map<String, Object> params) {return DataSourceContext.getClientMap(datasourceCode).executeCommandASyn(command, params);}/*** 是否有導(dǎo)出權(quán)限* odps適用** @param datasourceCode 數(shù)據(jù)源編碼* @param tableName      表名* @return {@link Boolean}*/public static Boolean exportEnable(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).exportEnable(tableName);}/*** 插入單條數(shù)據(jù)** @param datasourceCode* @param vo* @return*/public static Integer insert(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).insert(vo);}/*** 批量插入數(shù)據(jù)** @param datasourceCode* @param vo* @return*/public static Integer[] betchInsert(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsert(vo);}/*** 批量插入數(shù)據(jù)** @param datasourceCode* @param vo* @return*/public static Integer[] betchInsertByConnection(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsertByConnection(vo);}/*** 這個方法不需要分裝參數(shù),直接傳字段名稱list就好了* @param datasourceCode* @param vo* @return*/public static Integer[] betchInsertForCommom(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsertForCommom(vo);}/*** 刪除數(shù)據(jù)** @param datasourceCode* @param vo* @return*/public static Integer delete(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).delete(vo);}/*** 這個刪除方法可以自定義條件服號* @param datasourceCode* @param vo* @return*/public static Integer deleteForCommon(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForCommon(vo);}public static Integer deleteForFile(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForFile(vo);}public static String deleteForPre(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForPre(vo);}/*** 修改數(shù)據(jù)** @param datasourceCode* @param vo* @return*/public static Integer update(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).update(vo);}/*** 修改數(shù)據(jù)** @param datasourceCode* @param vo* @return*/public static Integer updateForFile(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).updateForFile(vo);}/*** 獲取表單基本信息** @param vo* @return*/public static TableMetaDataVo getTableBasicInfo(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).getTableBasicInfo(vo);}/*** 根據(jù)字段type建表*/public static void createCollectTable(List<CatalogColumnInfo> columnDtos, String tableName, String datasourceCode, String tableComment, Boolean ifPartition) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSqlForCollect(columnDtos, tableName, tableComment, ifPartition);log.info("執(zhí)行建表語句為:" + JSON.toJSONString(sqls));try {sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));} catch (Exception e) {e.printStackTrace();String message = e.getMessage();if (e instanceof BizException) {BizException exception = (BizException) e;message = exception.getMsg();}log.error("建表錯誤=======================>{}:", message);ThreadPoolExecutor instance = ThreadPoolUtil.instance();String finalMessage = message;instance.submit(() -> {CreateTableLog createTableLog = new CreateTableLog();createTableLog.setErrorLog(finalMessage);createTableLog.setParams(JSON.toJSONString(sqls));createTableLog.setCode(tableName);CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);createTableLogService.save(createTableLog);});throw new BizException(ResultEnum.ERROR.getCode(), "建表失敗請聯(lián)系管理員");}}/*** 根據(jù)字段type建表*/public static void updateCollectTable(CreateCollectVo vo) {IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());List<String> sqls = client.buildTableSqlForUpdate(vo);log.info("執(zhí)行更新表語句為:" + JSON.toJSONString(sqls));try {sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));} catch (Exception e) {e.printStackTrace();String message = e.getMessage();if (e instanceof BizException) {BizException exception = (BizException) e;message = exception.getMsg();}log.error("建表錯誤=======================>{}:", message);ThreadPoolExecutor instance = ThreadPoolUtil.instance();String finalMessage = message;instance.submit(() -> {CreateTableLog createTableLog = new CreateTableLog();createTableLog.setErrorLog(finalMessage);createTableLog.setParams(JSON.toJSONString(sqls));createTableLog.setCode(vo.getTableName());CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);createTableLogService.save(createTableLog);});log.info("建表失敗了開始準(zhǔn)備拋出了-------------------------------------->");throw new BizException(ResultEnum.ERROR.getCode(), "建表失敗請聯(lián)系管理員");}}/*** 獲取數(shù)據(jù)源下所有表信息(包括表名,表字段數(shù),表創(chuàng)建時間)** @param datasourceCode* @param tableNameLike* @return*/public static List<TableDto> getTablesDetail(String datasourceCode, String tableNameLike, Integer start, Integer pageSize, String specifyTableName) {return DataSourceContext.getClientMap(datasourceCode).getTablesDetail(tableNameLike, start, pageSize, specifyTableName);}/*** 獲取表數(shù)量* @param datasourceCode* @param tableName* @return*/public static Long getTableCountSchema(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableCountSchema(tableName);}public static Integer getTableColumnCount(String dataSourceCode, String tableName) {return DataSourceContext.getClientMap(dataSourceCode).getTableColumnCount(tableName);}public static Integer getPreTableColumnCount(String dataSourceCode, String tableName) {return DataSourceContext.getClientMap(dataSourceCode).getPreTableColumnCount(tableName);}/*** 獲取符號* @return*/public static String getSymbol(String datasourceCode) {return DataSourceContext.getClientMap(datasourceCode).getSymbol();}}
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
public class ReflectionUtils {private static final Map<String, Set<?>> clazzMap = new ConcurrentHashMap<>();private static final ReentrantLock clazzLock = new ReentrantLock();/*** 通過反射獲取接口/抽象類的所有實現(xiàn)類* 通過緩存類信息減少查找時間* 接口與抽象類必須放在實現(xiàn)類的同級目錄或者父目錄*/@SuppressWarnings("unchecked")public static <T> Set<Class<? extends T>> getReflections(Class<T> clazz) {if (clazzMap.containsKey(clazz.getName())) {return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());}try {clazzLock.lock();if (clazzMap.containsKey(clazz.getName())) {return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());}Reflections reflections = new Reflections(clazz.getPackage().getName());Set<Class<? extends T>> subTypesOf = reflections.getSubTypesOf(clazz);clazzMap.put(clazz.getName(), subTypesOf);return subTypesOf;} catch (Exception e) {log.error("getReflections error", e);} finally {clazzLock.unlock();}return new HashSet<>();}/*** 通過反射獲取新對象* @param type type* @param methodName methodName* @param clazz clazz* @return <T>*/public static <T> T getInstance(String type, String methodName, Class<T> clazz) {Set<Class<? extends T>> set = getReflections(clazz);for (Class<? extends T> t : set) {try {//排除抽象類if (Modifier.isAbstract(t.getModifiers())) {continue;}Object obj = t.getMethod(methodName).invoke(t.newInstance());if (type.equalsIgnoreCase(obj.toString())) {return t.newInstance();}} catch (Exception e) {log.error("getInstance error", e);}}throw new RuntimeException("implement class not exist");}/*** 通過反射獲取新對象* @param type type* @param methodName methodName* @param clazz clazz* @return <T>*/public static <T> T getInstanceFromCache(String type, String methodName, Class<T> clazz) {return getInstance(type, methodName, clazz);}}

?client客戶接口端適配多種數(shù)據(jù)源


import com.ws.websocket.entity.DatasourceInfo;/*** Description:* User: wangsheng* Date: 2022-12-30* Time: 10:31*/
public interface IClient {/*** 連接數(shù)據(jù)源** @param dataSourceInfo 數(shù)據(jù)源信息* @return {@link IClient}*/IClient open(DatasourceInfo dataSourceInfo);/*** 關(guān)閉數(shù)據(jù)源*/void close();/*** 驅(qū)動類型** @return*/String driverName();/*** 數(shù)據(jù)源類型** @return {@link String}*/String type();/*** 測試數(shù)據(jù)源** @param datasourceInfo* @return*/boolean testSource(DatasourceInfo datasourceInfo);}

import com.ws.websocket.entity.DatasourceInfo;
//公共查詢
public abstract class AbsClient implements IClient  {protected DatasourceInfo datasourceInfo;
}
package com.ws.websocket.util;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;@Slf4j
public abstract class AbsRdbmsClient extends AbsClient {protected DruidDataSource druidDataSource;@Overridepublic IClient open(DatasourceInfo datasourceInfo) {RdbmsConfig rdbmsConfig = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);DruidDataSource druidDataSource = new DruidDataSource();druidDataSource.setInitialSize(5);druidDataSource.setMinIdle(30);druidDataSource.setMaxActive(300);druidDataSource.setMaxWait(10000);druidDataSource.setBreakAfterAcquireFailure(true);// 跳出重試循環(huán)druidDataSource.setConnectionErrorRetryAttempts(3);// 重試三次druidDataSource.setTimeBetweenConnectErrorMillis(3000);druidDataSource.setLoginTimeout(3);druidDataSource.setUrl(rdbmsConfig.getJdbcUrl());druidDataSource.setDriverClassName(driverName());druidDataSource.setUsername(rdbmsConfig.getUsername());//解密//  druidDataSource.setPassword(RsaUtils.decode(rdbmsConfig.getPassword()));druidDataSource.setPassword(rdbmsConfig.getPassword());// 設(shè)置 MetaUtil 工具類所需參數(shù)Properties properties = new Properties();properties.put("remarks", "true");properties.put("useInformationSchema", "true");druidDataSource.setConnectProperties(properties);this.druidDataSource = druidDataSource;this.datasourceInfo = datasourceInfo;return this;}@Overridepublic void close() {druidDataSource.close();}@Overridepublic boolean testSource(DatasourceInfo datasourceInfo) {Connection connection = null;try {Class.forName(driverName());String linkJson = datasourceInfo.getLinkJson();RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);connection = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getPassword());// 有效if (connection.isValid(3)) {return true;} else {return false;}} catch (SQLException e) {log.error("數(shù)據(jù)源測試失敗", e);return false;} catch (ClassNotFoundException e) {log.error("未找到驅(qū)動信息:{}", driverName());return false;} finally {if (connection != null) {try {connection.close();} catch (SQLException ex) {ex.printStackTrace();}}}}@Dataclass RdbmsConfig  {private String jdbcUrl;private String username;private String password;public void setSSL() {String lowerCase = this.jdbcUrl.toLowerCase();if (!lowerCase.contains("usessl")) {if (this.jdbcUrl.contains("?")) {this.jdbcUrl = this.jdbcUrl + "&useSSL=false";} else {this.jdbcUrl = this.jdbcUrl + "?useSSL=false";}}}}
}
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;@Slf4j
public class DmClient extends AbsRdbmsClient {private String schema;@Overridepublic String type() {return "DMDB";}@Overridepublic String driverName() {return "dm.jdbc.driver.DmDriver";}@Overridepublic IClient open(DatasourceInfo datasourceInfo) {RdbmsConfig commonLinkParams = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);this.schema = StringUtils.isNotBlank(datasourceInfo.getSchemaName()) ? datasourceInfo.getSchemaName() : commonLinkParams.getUsername().toUpperCase();datasourceInfo.setSchemaName(schema);return super.open(datasourceInfo);}@Overridepublic void close() {}@Overridepublic boolean testSource(DatasourceInfo datasourceInfo) {return false;}
}

4.創(chuàng)建redis訂閱數(shù)據(jù)源操作頻道配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;/*** @Author: wangsheng* @Data: 2022/8/16 16:40*/
@Slf4j
@Configuration
public class RedisListenerConfig {/*** 訂閱數(shù)據(jù)源操作頻道** @param connectionFactory connectionFactory* @param dataSourceMonitor 數(shù)據(jù)源監(jiān)視器* @return RedisMessageListenerContainer*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,DataSourceMonitor dataSourceMonitor){RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(dataSourceMonitor, new PatternTopic("DATASOURCE_CHANNEL"));log.info(dataSourceMonitor.getClass().getName() + " 訂閱頻道 {}", "DATASOURCE_CHANNEL");return container;}
}

5.redis監(jiān)聽數(shù)據(jù)源操作


import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;/*** Description: redis監(jiān)聽數(shù)據(jù)源操作* User: wangsheng* Date: 2022-08-12* Time: 17:07*/
@Slf4j
@Component
public class DataSourceMonitor implements MessageListener {@Overridepublic void onMessage(Message message, byte[] bytes) {JSONObject box = JSONObject.parseObject(new String(message.getBody(), StandardCharsets.UTF_8));String operation = box.getString("key");if ("SAVE_OR_UPDATE".equals(operation)) {// 更新 DataSourceContextDatasourceInfo datasourceInfo = box.getObject("value", DatasourceInfo.class);if (datasourceInfo.getType().equals(0)) {String datasourceCode = datasourceInfo.getDatasourceCode();DataSourceContext.setConfigMap(datasourceCode, datasourceInfo);DataSourceContext.setClientMap(datasourceInfo);log.info("redis 監(jiān)聽到數(shù)據(jù)源 {} 新增或更新,更新 DataSourceContext 完成", datasourceCode);}} else {String datasourceCode = box.getString("value");// 更新 DataSourceContextDataSourceContext.removeConfigMap(datasourceCode);DataSourceContext.removeClientMap(datasourceCode);log.info("redis 監(jiān)聽到數(shù)據(jù)源 {} 刪除,更新 DataSourceContext 完成", datasourceCode);}}}

6.創(chuàng)建AOP自動監(jiān)聽數(shù)據(jù)源變化


import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** @Author: wangsheng* @Data: 2022/8/15 16:37*/
@Slf4j
@Aspect
@Component
public class DatasourceAspect {@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 新增或編輯數(shù)據(jù)源時發(fā)布 Redis 消息*/@AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.saveOrUpdateDatasourceInfo(..))", returning = "datasourceInfo")public void saveOrUpdate(JoinPoint joinPoint, DatasourceInfo datasourceInfo) {HashMap<String, Object> box = new HashMap<>(4);box.put("key", "SAVE_OR_UPDATE");box.put("value", datasourceInfo);// 發(fā)布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL",JSONObject.toJSONString(box));log.info("新增或更新數(shù)據(jù)源 {} 方法切面發(fā)布 Redis 消息完成", datasourceInfo.getDatasourceCode());}/*** 刪除數(shù)據(jù)源時發(fā)布 Redis 消息*/@AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.deleteDatasourceInfo(..))", returning = "datasourceCode")public void delete(JoinPoint joinPoint, String datasourceCode) {Map<String, Object> box = new HashMap<>(4);box.put("key", "DELETE");box.put("value", datasourceCode);// 發(fā)布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));log.info("刪除數(shù)據(jù)源 {} 方法切面發(fā)布Redis消息完成", datasourceCode);}
}

這樣就解決了數(shù)據(jù)源連接信息自動加載更新同步的問題,但還是有個問題,當(dāng)數(shù)據(jù)源重啟后,緩存的連接信息會失效,且AOP無法監(jiān)聽到數(shù)據(jù)源重啟變動,這個時候還需要一個定時任務(wù)對數(shù)據(jù)源進(jìn)行連接測試,如果失效則重新連接緩存上。

7.創(chuàng)建定時任務(wù)

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ws.websocket.entity.DatasourceInfo;
import com.ws.websocket.service.DatasourceInfoService;
import com.ws.websocket.util.DBTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;@Component
@RequiredArgsConstructor
@Slf4j
public class DataSourceRetryConnectSchedule {@Resourceprivate DatasourceInfoService datasourceInfoService;@Resourceprivate StringRedisTemplate stringRedisTemplate;//每2小時執(zhí)行一次@Scheduled(cron = "0 0 */2 * * ?")public void RetryConnect() {log.info("開始監(jiān)測數(shù)據(jù)源連接");QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();queryWrapper.eq("type", 0);List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);if (CollectionUtils.isEmpty(list)) {return;}for (DatasourceInfo datasourceInfo : list) {Boolean bb = DBTool.testSource(datasourceInfo);if (!bb) {log.info("數(shù)據(jù)源重連{}"+datasourceInfo.getDatasourceName());HashMap<String, Object> box = new HashMap<>(4);box.put("key", "SAVE_OR_UPDATE");box.put("value", datasourceInfo);// 發(fā)布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));}}}
}

http://m.risenshineclean.com/news/58494.html

相關(guān)文章:

  • 做動態(tài)文字的網(wǎng)站杭州網(wǎng)站定制
  • 做網(wǎng)站建設(shè)的公司有哪些方面建設(shè)網(wǎng)站公司
  • 教育網(wǎng)站如何做seo網(wǎng)絡(luò)運營團(tuán)隊
  • 域名怎樣連接到網(wǎng)站外貿(mào)推廣哪個公司好
  • 網(wǎng)站地圖制作怎么做?360網(wǎng)站排名優(yōu)化
  • 電影網(wǎng)站做seo愛站關(guān)鍵詞
  • 網(wǎng)站seo設(shè)計百度手機(jī)助手app
  • 做網(wǎng)站的術(shù)語域名注冊平臺哪個好
  • 一個大佬做的本子網(wǎng)站專業(yè)seo站長工具
  • 做搞機(jī)網(wǎng)站廣告公司經(jīng)營范圍
  • 怎么做批量的網(wǎng)站檢查網(wǎng)頁設(shè)計制作網(wǎng)站教程
  • 深圳定制西裝哪家好seo優(yōu)化網(wǎng)站模板
  • 中文網(wǎng)站模板免費下載域名??烤W(wǎng)頁推廣大全2021
  • 包裝設(shè)計網(wǎng)站排行榜十大接單推廣平臺
  • 微商城 微網(wǎng)站制作360應(yīng)用商店
  • 新河網(wǎng)招聘信息seo積分優(yōu)化
  • 誰有wap網(wǎng)站掌門一對一輔導(dǎo)官網(wǎng)
  • 安徽做網(wǎng)站杭州seo網(wǎng)絡(luò)推廣
  • 做網(wǎng)站要的圖片斗魚百度關(guān)鍵詞排名工具
  • 醫(yī)院網(wǎng)站建設(shè)策劃案模板b2b平臺免費推廣網(wǎng)站
  • 門戶網(wǎng)站的基本特征多選題seo整站優(yōu)化外包
  • 怎樣制作自己公司的網(wǎng)站西安百度關(guān)鍵詞優(yōu)化
  • 什么網(wǎng)站做家電測評淘寶網(wǎng)店運營
  • 做視頻網(wǎng)站公司要怎么做的最新國內(nèi)新聞事件今天
  • 酒店賓館型網(wǎng)站開發(fā)網(wǎng)站是怎么做的
  • 石家莊好用的招聘網(wǎng)站公司網(wǎng)站設(shè)計與制作
  • 吧網(wǎng)站做軟件的軟件下載短期的技能培訓(xùn)有哪些
  • wordpress cathy主題專業(yè)seo網(wǎng)絡(luò)推廣
  • 網(wǎng)站建設(shè)waocc百度 seo優(yōu)化作用
  • 網(wǎng)站調(diào)用接口怎么做新站點seo聯(lián)系方式