张开涛:京东业务数据应用级缓存示例

我们的业务数据如商品类目、店铺、商品基本信息都可以进行适当的本地缓存,以提升性能。对于多实例的情况时不仅会使用本地缓存,还会使用分布式缓存,因此需要进行适当的API封装以简化缓存操作。 2017-04-21 08:51:42 API缓存分布式 继续探索with语句的奇妙之处 在上一篇博客《漂亮的with,鱼与熊掌可以兼得》中,展现了with的优雅之处,然而在比较with与|>时,言犹未尽,讲得不够透彻。本文我们继续探索with语句的奇妙之处。 2017-04-21 08:32:25 with磁盘数据 80%的Java程序员不知道反射强行调用私有构造器这事儿 在我之前的一篇文章里曾提到一个观点:“可能会有人使用反射强行调用我们的私有构造器”,很多童鞋不明白Java反射机制怎么做到调用私有构造器,今天我们来做一个实验。

我们的业务数据如商品类目、店铺、商品基本信息都可以进行适当的本地缓存,以提升性能。对于多实例的情况时不仅会使用本地缓存,还会使用分布式缓存,因此需要进行适当的API封装以简化缓存操作。

一、多级缓存API封装

我们的业务数据如商品类目、店铺、商品基本信息都可以进行适当的本地缓存,以提升性能。对于多实例的情况时不仅会使用本地缓存,还会使用分布式缓存,因此需要进行适当的API封装以简化缓存操作。

[[189271]]

1. 本地缓存初始化

  1. publicclassLocalCacheInitServiceextendsBaseService{
  2. @Override
  3. publicvoidafterPropertiesSet()throwsException{
  4. //商品类目缓存
  5. Cache<String,Object>categoryCache=
  6. CacheBuilder.newBuilder()
  7. .softValues()
  8. .maximumSize(1000000)
  9. .expireAfterWrite(Switches.CATEGORY.getExpiresInSeconds()/2,TimeUnit.SECONDS)
  10. .build();
  11. addCache(CacheKeys.CATEGORY_KEY,categoryCache);
  12. }
  13. privatevoidaddCache(Stringkey,Cache<?,?>cache){
  14. localCacheService.addCache(key,cache);
  15. }
  16. }

本地缓存过期时间使用分布式缓存过期时间的一半,防止本地缓存数据缓存时间太长造成多实例间的数据不一致。

另外,将缓存KEY前缀与本地缓存关联,从而匹配缓存KEY前缀就可以找到相关联的本地缓存。

2. 写缓存API封装

先写本地缓存,如果需要写分布式缓存,则通过异步更新分布式缓存。

  1. publicvoidset(finalStringkey,finalObjectvalue,finalintremoteCacheExpiresInSeconds)throwsRuntimeException{
  2. if(value==null){
  3. return;
  4. }
  5. //复制值对象
  6. //本地缓存是引用,分布式缓存需要序列化
  7. //如果不复制的话,则假设之后数据改了将造成本地缓存与分布式缓存不一致
  8. finalObjectfinalValue=copy(value);
  9. //如果配置了写本地缓存,则根据KEY获得相关的本地缓存,然后写入
  10. if(writeLocalCache){
  11. CachelocalCache=getLocalCache(key);
  12. if(localCache!=null){
  13. localCache.put(key,finalValue);
  14. }
  15. }
  16. //如果配置了不写分布式缓存,则直接返回
  17. if(!writeRemoteCache){
  18. return;
  19. }
  20. //异步更新分布式缓存
  21. asyncTaskExecutor.execute(()->{
  22. try{
  23. redisCache.set(key,JSONUtils.toJSON(finalValue),remoteCacheExpiresInSeconds);
  24. }catch(Exceptione){
  25. LOG.error("updaterediscacheerror,key:{}",key,e);
  26. }
  27. });
  28. }

此处使用了异步更新,目的是让用户请求尽快返回。而因为有本地缓存,所以即使分布式缓存更新比较慢又产生了回源,也可以在本地缓存***。

3. 读缓存API封装

先读本地缓存,本地缓存不***的再批量查询分布式缓存,在查询分布式缓存时通过分区批量查询。

  1. privateMapinnerMget(List<String>keys,List<Class>types)throwsException{
  2. Map<String,Object>result=Maps.newHashMap();
  3. List<String>missKeys=Lists.newArrayList();
  4. List<Class>missTypes=Lists.newArrayList();
  5. //如果配置了读本地缓存,则先读本地缓存
  6. if(readLocalCache){
  7. for(inti=0;i<keys.size();i++){
  8. Stringkey=keys.get(i);
  9. Classtype=types.get(i);
  10. CachelocalCache=getLocalCache(key);
  11. if(localCache!=null){
  12. Objectvalue=localCache.getIfPresent(key);
  13. result.put(key,value);
  14. if(value==null){
  15. missKeys.add(key);
  16. missTypes.add(type);
  17. }
  18. }else{
  19. missKeys.add(key);
  20. missTypes.add(type);
  21. }
  22. }
  23. }
  24. //如果配置了不读分布式缓存,则返回
  25. if(!readRemoteCache){
  26. returnresult;
  27. }
  28. finalMap<String,String>missResult=Maps.newHashMap();
  29. //对KEY分区,不要一次性批量调用太大
  30. finalList<List<String>>keysPage=Lists.partition(missKeys,10);
  31. List<Future<Map<String,String>>>pageFutures=Lists.newArrayList();
  32. try{
  33. //批量获取分布式缓存数据
  34. for(finalList<String>partitionKeys:keysPage){
  35. pageFutures.add(asyncTaskExecutor.submit(()->redisCache.mget(partitionKeys)));
  36. }
  37. for(Future<Map<String,String>>future:pageFutures){
  38. missResult.putAll(future.get(3000,TimeUnit.MILLISECONDS));
  39. }
  40. }catch(Exceptione){
  41. pageFutures.forEach(future->future.cancel(true));
  42. throwe;
  43. }
  44. //合并result和missResult,此处实现省略
  45. returnresult;
  46. }

此处将批量读缓存进行了分区,防止乱用批量获取API。

二、NULL Cache

首先,定义NULL对象。

  1. privatestaticfinalStringNULL_STRING=newString();

当DB没有数据时,写入NULL对象到缓存

  1. //查询DB
  2. Stringvalue=loadDB();
  3. //如果DB没有数据,则将其封装为NULL_STRING并放入缓存
  4. if(value==null){
  5. value=NULL_STRING;
  6. }
  7. myCache.put(id,value);

读取数据时,如果发现NULL对象,则返回null,而不是回源到DB

  1. value=suitCache.getIfPresent(id);
  2. //DB没有数据,返回null
  3. if(value==NULL_STRING){
  4. returnnull;
  5. }

通过这种方式可以防止当KEY对应的数据在DB不存在时频繁查询DB的情况。

三、强制获取***数据

在实际应用中,我们经常需要强制更新数据,此时就不能使用缓存数据了,可以通过配置ThreadLocal开关来决定是否强制刷新缓存(refresh方法要配合CacheLoader一起使用)。

  1. if(ForceUpdater.isForceUpdateMyInfo()){
  2. myCache.refresh(skuId);
  3. }
  4. Stringresult=myCache.get(skuId);
  5. if(result==NULL_STRING){
  6. returnnull;
  7. }

四、失败统计

  1. privateLoadingCache<String,AtomicInteger>failedCache=
  2. CacheBuilder.newBuilder()
  3. .softValues()
  4. .maximumSize(10000)
  5. .build(newCacheLoader<String,AtomicInteger>(){
  6. @Override
  7. publicAtomicIntegerload(StringskuId)throwsException{
  8. returnnewAtomicInteger(0);
  9. }
  10. });

当失败时,通过failedCache.getUnchecked(id).incrementAndGet()增加失败次数;当成功时,使用failedCache.invalidate(id)失效缓存。通过这种方式可以控制失败重试次数,而且又是内存敏感缓存。当内存不足时,可以清理该缓存腾出一些空间。

五、延迟报警

  1. privatestaticLoadingCache<String,Integer>alarmCache=
  2. CacheBuilder.newBuilder()
  3. .softValues()
  4. .maximumSize(10000).expireAfterAccess(1,TimeUnit.HOURS)
  5. .build(newCacheLoader<String,Integer>(){
  6. @Override
  7. publicIntegerload(Stringkey)throwsException{
  8. return0;
  9. }
  10. });
  11. //报警代码
  12. Integercount=0;
  13. if(redis!=null){
  14. StringcountStr=Objects.firstNonNull(redis.opsForValue().get(key),"0");
  15. count=Integer.valueOf(countStr);
  16. }else{
  17. count=alarmCache.get(key);
  18. }
  19. if(count%5==0){//5次报一次
  20. //报警
  21. }
  22. countcount=count+1;
  23. if(redis!=null){
  24. redis.opsForValue().set(key,String.valueOf(count),1,TimeUnit.HOURS);
  25. }else{
  26. alarmCache.put(key,count);
  27. }

如果一出问题就报警,则存在报警量非常多或者假报警,因此,可以考虑N久报警了M次,才真正报警。此时,也可以使用Cache来统计。本示例还加入了Redis分布式缓存记录支持。

六、性能测试

笔者使用JMH 1.14进行基准性能测试,比如测试写。

  1. @Benchmark
  2. @Warmup(iterations=10,time=10,timeUnit=TimeUnit.SECONDS)
  3. @Measurement(iterations=10,time=10,timeUnit=TimeUnit.SECONDS)
  4. @BenchmarkMode(Mode.Throughput)
  5. @OutputTimeUnit(TimeUnit.SECONDS)
  6. @Fork(1)
  7. publicvoidtest_1_Write(){
  8. counterWritercounterWriter=counterWriter+1;
  9. myCache.put("key"+counterWriter,"value"+counterWriter);
  10. }

使用JMH时首先进行JVM预热,然后进行度量,产生测试结果(本文使用吞吐量)。建议读者按照需求进行基准性能测试来选择适合自己的缓存框架。

【本文是清一色专栏作者张开涛的原创文章,作者微信公众号:开涛的博客( kaitao-1234567)】

戳这里,看该作者更多好文

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2023年5月4日 22:51
下一篇 2023年5月4日 22:52

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信