25轉(zhuǎn)行做網(wǎng)站運營優(yōu)化搜狗排名
Redis 批量處理
在開發(fā)中,有時需要對Redis 進行大批量的處理。
比如Redis批量查詢多個Hash。如果是在for循環(huán)中逐個查詢,那性能會很差。
這時,可以使用 Pipeline (管道)。
Pipeline (管道)
Pipeline (管道) 可以一次性發(fā)送多條命令并在執(zhí)行完后一次性將結(jié)果返回,pipeline 通過減少客戶端與 redis 的通信次數(shù)來實現(xiàn)降低往返延時時間,而且 Pipeline 實現(xiàn)的原理是隊列,而隊列的原理是時先進先出,這樣就保證數(shù)據(jù)的順序性。
RedisTemplate的管道操作
RedisTemplate的管道操作,使用**executePipelined()**方法。
org.springframework.data.redis.core.RedisTemplate#executePipelined(org.springframework.data.redis.core.SessionCallback<?>)
@Overridepublic List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");Assert.notNull(session, "Callback object must not be null");RedisConnectionFactory factory = getRequiredConnectionFactory();// 綁定連接RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);try {return execute((RedisCallback<List<Object>>) connection -> {//打開管道connection.openPipeline();boolean pipelinedClosed = false;try {Object result = executeSession(session);//callback的返回值,只能返回null,否則會報錯。if (result != null) {throw new InvalidDataAccessApiUsageException("Callback cannot return a non-null value as it gets overwritten by the pipeline");}//關(guān)閉管道,并獲取返回值List<Object> closePipeline = connection.closePipeline();pipelinedClosed = true;return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);} finally {if (!pipelinedClosed) {connection.closePipeline();}}});} finally {RedisConnectionUtils.unbindConnection(factory);}}
- 注意:
(1) executePipelined()的callback參數(shù),實現(xiàn)execute() 方法只能返回null,否則會報錯。
報錯: InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
源碼如下:
Object result = executeSession(session);
if (result != null) {
throw new InvalidDataAccessApiUsageException(
"Callback cannot return a non-null value as it gets overwritten by the pipeline");
}
(2)在executePipelined()中, 使用 get()方法 得到的value會是null。
在 redisTemplate進行管道操作,結(jié)果值只能通過 executePipelined()方法的返回值List獲取.
/*** Get the value of {@code key}.** @param key must not be {@literal null}.* 在管道(pipeline)中使用 get()方法 得到的value會是null* @return {@literal null} when used in pipeline / transaction.*/@NullableV get(Object key);
- redisTemplate獲取管道返回值:
List<Object> list = stringRedisTemplate.executePipelined(new SessionCallback<String>() {@Overridepublic String execute(@NonNull RedisOperations operations) throws DataAccessException {//idList是多個id的集合for (String id : idList) {//key由前綴加唯一id組成String key = KEY_PREFIX + id;//在管道中使用 get()方法 得到的value會是null。value通過executePipelined()的返回值List<Object>獲取。operations.opsForHash().get(key, field);}//Callback只能返回null, 否則報錯:// InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipelinereturn null;}});list.forEach(System.out::println);
Jedis操作管道
RedisTemplate操作管道比較方便,但如果要組裝key和value的map,就會比較麻煩。
在這種情況下,可以使用Jedis。
比如,Jedis批量查詢多個Hash,可以使用 Pipeline (管道)。
源碼見: redis.clients.jedis.PipelineBase#hget(java.lang.String, java.lang.String)
hget()方法,跟普通hash的hget()有點類似,不過返回值是 Response。
public Response<String> hget(String key, String field) {this.getClient(key).hget(key, field);return this.getResponse(BuilderFactory.STRING);}
示例:
public void testPipLine() {Map<String, Response<String>> responseMap = new HashMap<>();//try-with-resources, 自動關(guān)閉資源//先連接jedis,再拿到 pipelinetry (Jedis jedis = getJedis();Pipeline pipeline = jedis.pipelined()) {for (String id : idList) {//前綴加唯一idString key = KEY_PREFIX + id;//使用pipeline.hget查詢hash的數(shù)據(jù)Response<String> response = pipeline.hget(key, field);responseMap.put(id, response);}pipeline.sync();} catch (Exception ex) {log.error("responses error.", ex);}Map<String, String> map = new HashMap<>();//組裝map。response.get()在pipeline關(guān)閉后才能執(zhí)行,否則拿到的value都是nullresponseMap.forEach((k,response) -> map.put(k, response.get()));map.forEach((k,v)-> System.out.println(k+",val:"+v));}private static Pool<Jedis> jedisPool = null;/*** 連接redis,獲取jedisPool* @return*/public Jedis getJedis() {if (jedisPool == null) {JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(maxTotal);poolConfig.setMaxIdle(maxIdle);poolConfig.setMaxWaitMillis(maxWaitMillis);poolConfig.setTestOnBorrow(testOnBorrow);//配置可以寫在配置中心/文件jedisPool = new JedisPool(poolConfig, host, port, timeout, password, database);}return jedisPool.getResource();}
參考資料
https://redis.io/docs/manual/pipelining/
https://www.cnblogs.com/expiator/p/11127719.html