张开涛:京东业务数据应用级缓存示例

一、张开多级缓存API封装

我们的涛京业务数据如商品类目、店铺、东业商品基本信息都可以进行适当的用级本地缓存,以提升性能。缓存对于多实例的示例情况时不仅会使用本地缓存,还会使用分布式缓存,张开因此需要进行适当的涛京API封装以简化缓存操作。

1. 本地缓存初始化

public class LocalCacheInitService extends BaseService {     @Override     publicvoid afterPropertiesSet() throws Exception {          //商品类目缓存         Cache<String,东业 Object> categoryCache =                CacheBuilder.newBuilder()                         .softValues()                         .maximumSize(1000000)                        .expireAfterWrite(Switches.CATEGORY.getExpiresInSeconds()/ 2, TimeUnit.SECONDS)                         .build();        addCache(CacheKeys.CATEGORY_KEY, categoryCache);     }     privatevoid addCache(String key, Cache<?, ?> cache) {          localCacheService.addCache(key,cache);     } } 

本地缓存过期时间使用分布式缓存过期时间的一半,防止本地缓存数据缓存时间太长造成多实例间的用级数据不一致。

另外,缓存将缓存KEY前缀与本地缓存关联,示例从而匹配缓存KEY前缀就可以找到相关联的张开本地缓存。

2. 写缓存API封装

先写本地缓存,涛京如果需要写分布式缓存,东业则通过异步更新分布式缓存。

public void set(final String key, final Object value, final intremoteCacheExpiresInSeconds) throws RuntimeException {      if (value== null) {          return;     }     //复制值对象     //本地缓存是引用,分布式缓存需要序列化     //如果不复制的话,则假设之后数据改了将造成本地缓存与分布式缓存不一致     final Object finalValue = copy(value);     //如果配置了写本地缓存,则根据KEY获得相关的本地缓存,然后写入     if (writeLocalCache) {         Cache localCache = getLocalCache(key);         if(localCache != null) {             localCache.put(key, finalValue);         }     }     //如果配置了不写分布式缓存,则直接返回     if (!writeRemoteCache) {          return;     }     //异步更新分布式缓存     asyncTaskExecutor.execute(() -> {          try {              redisCache.set(key,JSONUtils.toJSON(finalValue), remoteCacheExpiresInSeconds);         } catch(Exception e) {              LOG.error("updateredis cache error, key : { }", key, e);         }     }); } 

此处使用了异步更新,目的是云南idc服务商让用户请求尽快返回。而因为有本地缓存,所以即使分布式缓存更新比较慢又产生了回源,也可以在本地缓存***。

3. 读缓存API封装

先读本地缓存,本地缓存不***的再批量查询分布式缓存,在查询分布式缓存时通过分区批量查询。

private Map innerMget(List<String> keys, List<Class> types) throwsException {     Map<String, Object> result = Maps.newHashMap();    List<String> missKeys = Lists.newArrayList();    List<Class> missTypes = Lists.newArrayList();     //如果配置了读本地缓存,则先读本地缓存     if(readLocalCache) {          for(int i = 0; i < keys.size(); i++) {             String key = keys.get(i);            Class type = types.get(i);            Cache localCache = getLocalCache(key);             if(localCache != null) {                 Object value = localCache.getIfPresent(key);                result.put(key, value);                if (value == null) {                     missKeys.add(key);                     missTypes.add(type);                }            } else {                 missKeys.add(key);                missTypes.add(type);            }         }     }     //如果配置了不读分布式缓存,则返回     if(!readRemoteCache) {          returnresult;     }     finalMap<String, String> missResult = Maps.newHashMap();     //对KEY分区,不要一次性批量调用太大     final List<List<String>>keysPage = Lists.partition(missKeys, 10);    List<Future<Map<String, String>>> pageFutures = Lists.newArrayList();     try {          //批量获取分布式缓存数据         for(final List<String>partitionKeys : keysPage) {             pageFutures.add(asyncTaskExecutor.submit(() -> redisCache.mget(partitionKeys)));         }         for(Future<Map<String,String>> future : pageFutures) {             missResult.putAll(future.get(3000, TimeUnit.MILLISECONDS));         }     } catch(Exception e) {         pageFutures.forEach(future -> future.cancel(true));         throw e;     }     //合并result和missResult,此处实现省略     return result; } 

此处将批量读缓存进行了分区,防止乱用批量获取API。

二、NULL Cache

首先,定义NULL对象。

private static final String NULL_STRING =new String(); 

当DB没有数据时,写入NULL对象到缓存

//查询DB String value = loadDB(); //如果DB没有数据,则将其封装为NULL_STRING并放入缓存 if(value == null) {      value = NULL_STRING; } myCache.put(id, value); 

读取数据时,如果发现NULL对象,则返回null,而不是回源到DB

value = suitCache.getIfPresent(id); //DB没有数据,返回null if(value == NULL_STRING) {      return null; } 

通过这种方式可以防止当KEY对应的数据在DB不存在时频繁查询DB的情况。

三、强制获取***数据

在实际应用中,我们经常需要强制更新数据,此时就不能使用缓存数据了,可以通过配置ThreadLocal开关来决定是否强制刷新缓存(refresh方法要配合CacheLoader一起使用)。源码下载

if(ForceUpdater.isForceUpdateMyInfo()) {      myCache.refresh(skuId); } String result = myCache.get(skuId); if(result == NULL_STRING) {      return null; } 

四、失败统计

private LoadingCache<String, AtomicInteger> failedCache =        CacheBuilder.newBuilder()                .softValues()                .maximumSize(10000)                .build(new CacheLoader<String, AtomicInteger>() {                     @Override                     public AtomicIntegerload(String skuId) throws Exception {                          return new AtomicInteger(0);                    }                }); 

当失败时,通过failedCache.getUnchecked(id).incrementAndGet()增加失败次数;当成功时,使用failedCache.invalidate(id)失效缓存。通过这种方式可以控制失败重试次数,而且又是内存敏感缓存。当内存不足时,可以清理该缓存腾出一些空间。

五、延迟报警

private static LoadingCache<String, Integer> alarmCache =        CacheBuilder.newBuilder()                 .softValues()                .maximumSize(10000).expireAfterAccess(1, TimeUnit.HOURS)                .build(new CacheLoader<String, Integer>() {                     @Override                    public Integer load(String key) throws Exception {                          return 0;                    }                }); //报警代码 Integer count = 0; if(redis != null) {      StringcountStr = Objects.firstNonNull(redis.opsForValue().get(key), "0");     count =Integer.valueOf(countStr); } else {      count = alarmCache.get(key); } if(count % 5 == 0) {  //5次报一次     //报警 } countcount = count + 1; if(redis != null) {      redis.opsForValue().set(key,String.valueOf(count), 1, TimeUnit. HOURS); } else {      alarmCache.put(key,count); } 

如果一出问题就报警,则存在报警量非常多或者假报警,因此,可以考虑N久报警了M次,才真正报警。此时,也可以使用Cache来统计。本示例还加入了Redis分布式缓存记录支持。

六、性能测试

笔者使用JMH 1.14进行基准性能测试,比如测试写。

@Benchmark @Warmup(iterations = 10, time = 10, timeUnit =TimeUnit.SECONDS) @Measurement(iterations = 10, time = 10, timeUnit= TimeUnit.SECONDS) @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @Fork(1) public void test_1_Write() {      counterWritercounterWriter= counterWriter + 1;     myCache.put("key"+ counterWriter, "value" + counterWriter); } 

使用JMH时首先进行JVM预热,然后进行度量,产生测试结果(本文使用吞吐量)。建议读者按照需求进行基准性能测试来选择适合自己的缓存框架。

【本文是专栏作者张开涛的原创文章,作者微信公众号:开涛的博客( kaitao-1234567)】

戳这里,看该作者更多好文

滇ICP备2023000592号-31