你的位置:首页 > Java教程

[Java教程]redis队列及多线程应用


  由于xxx平台上自己的博客已经很久没更新了,一直以来都是用的印象笔记来做工作中知识的积累存根,不知不觉印象笔记里已经有了四、五百遍文章。为了从新开始能与广大攻城狮共同提高技术能力与水平,随决心另起炉灶在新的博客与大家分享

  经过一段时间项目的沉淀之后,对实际应用中的多线程开发及队列使用产生了深厚的兴趣,也将<<java并发编程实战>>仔细的阅读了两三遍,也看了很多并发编程的实践项目,也有了深刻的理解与在实践中合理应用队列、多线程开发的应用场景

  1、真实应用场景描述:

   由于一段时间以来要针对公司整个电商平台包括官网、移动端所有的交易数据进行统计,统计指标包括:pv、uv、实付金额、转化率、毛利率等等,按照各种不同的维度来统计计算出当前交易系统的各个指标的数据,但要求该项目是独立的,没有任务其它资源的协助及接品提供。经过一番xxxx思考讨论之后。业务上决定用以下解决方案:

    A: 用一个定时服务每隔10秒去别的系统数据库抓取上一次查询时间以来新确认的订单(这种订单表示已经支付完在或者客户已经审核确认了),然后将这些订单的唯一编号放入redis队列。

    B: 由于用到了队列,根据经验自然而然的想到了  启动单独的线程去redis队列中不断获取要统计处理的订单编号,然后将获取到的订单编号放入线程池中进行订单的统计任务处理。

 

    开发实现:

    FetchConfirmOrdersFromErpJob.java

 1 /** 2    * 1、从redis中获取上次查询的时间戳 3    * 2、将当前时间戳放入到redis中,以便 下次按这个时间查询 4    * 3、去erp订单表查询confirm_time>=上次查询的时间的订单,放入队列中 5   */ 6   @Scheduled(cron = "0/30 * * * * ?") 7   public void start(){ 8     logger.info("FetchConfirmOrdersFromErpJob start................."+ new Date()); 9     StopWatch watch=new StopWatch();10     watch.start();11     //上次查询的时间12     String preQueryTimeStr=this.readRedisService.get(Constans.CACHE_PREQUERYORDERTIME);13     14     Date now=new Date();15     if(StringUtils.isBlank(preQueryTimeStr)){16       preQueryTimeStr=DateFormatUtils.format(DateUtils.addHours(now, -1), Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS);//第一次查询之前一个小时的订单17 //      preQueryTimeStr="2015-05-07 10:00:00";//本地测试的时候使用18     }19     //设置当前时间为上次查询的时间20     this.writeRedisService.set(Constans.CACHE_PREQUERYORDERTIME, DateFormatUtils.format(now, Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS));21     22     List<Map<String, Object>> confirmOrderIds = this.erpOrderService.selectOrderIdbyConfirmtime(preQueryTimeStr);23     if(confirmOrderIds==null){24       logger.info("query confirmOrderIds is null,without order data need dealth..........");25       return;26     }27     for (Map<String, Object> map : confirmOrderIds) {
         //将订单编号放入队列中28 this.writeRedisService.lpush(Constans.CACHE_ORDERIDS, map.get("channel_orderid").toString());29 logger.info("=======lpush orderid:"+map.get("channel_orderid").toString());30 }31 32 watch.stop();33 logger.info("FetchConfirmOrdersFromErpJob end................."+ new Date()+" total cost time:"+watch.getTime()+" dealth data count:"+confirmOrderIds.size());34 }

 

    OrderCalculate.java    队列获取订单线程

 1 public class OrderCalculate { 2  3   private static final Log logger = LogFactory.getLog(OrderCalculate.class); 4    5   @Autowired 6   private static WriteRedisService writeRedisService; 7    8   private static ExecutorService threadPool=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4 9       ,new TjThreadFactory("CalculateAmount"));10   static{11     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {12       @Override13       public void run() {14         QueuePop.stop();15         threadPool.shutdown();16       }17     }));18   }19   20   public void init(){21     if(writeRedisService==null){22       writeRedisService=SpringContext.getBean(WriteRedisService.class);23     }24     new Thread(new QueuePop(),"OrderIdQueuePop").start();//由于是用redis做的队列,所以只要使用一个线程从队列里拿就ok25   }26   27   static class QueuePop implements Runnable{28 29     volatile static boolean stop=false;30     31     @Override32     public void run() {33       while(!stop){34         //不断循环从队列里取出订单id35         String orderId=null;36         try {37           orderId = writeRedisService.rpop(Constans.CACHE_ORDERIDS);38           if(orderId!=null){39             logger.info("pop orderId:"+orderId);
                //将获取的订单编号交给订单统计任务处理线程处理40 threadPool.submit(new CalculateAmount(Integer.parseInt(orderId),new Date()));41 }42 } catch (Exception e1) {43 logger.error("",e1);44 }45 //根据上线后的业务反馈来确定是否改成wait/notify策略来及时处理确认的订单46 try {47 Thread.sleep(10);48 } catch (InterruptedException e) {49 logger.error("",e);50 // Thread.currentThread().interrupt();51 //stop=true;//线程被打算继续执行,不应该被关闭,保证该线程永远不会死掉52 }53 }54 }55 56 public static void stop(){57 stop=true;58 }59 60 }61 62 }

 

      CalculateAmoiunt.java   订单任务处理

 1 public class CalculateAmount implements Runnable { 2   private static final Log logger = LogFactory.getLog(CalculateAmount.class); 3   private int orderId; 4   private Date now;//确认时间 这个时间有一定的延迟,基本可以忽略,如果没什么用 5   private OrderService orderServices; 6   private OrdHaveProductService ordHaveProductService; 7   private OrdPayByCashbackService ordPayByCashbackService; 8   private OrdPayByCouponService ordPayByCouponService; 9   private OrdPayByGiftCardService ordPayByGiftCardService; 10   private StatisticsService statisticsService; 11   private WriteRedisService writeRedisService; 12   private ReadRedisService readRedisService; 13   private ErpOrderGoodsService erpOrderGoodsService; 14   private ErpOrderService erpOrderService; 15    16    17   public CalculateAmount(int orderId,Date now) { 18     super(); 19     this.orderId = orderId; 20     this.now=now; 21     orderServices=SpringContext.getBean(OrderService.class); 22     ordHaveProductService=SpringContext.getBean(OrdHaveProductService.class); 23     ordPayByCashbackService=SpringContext.getBean(OrdPayByCashbackService.class); 24     ordPayByCouponService=SpringContext.getBean(OrdPayByCouponService.class); 25     ordPayByGiftCardService=SpringContext.getBean(OrdPayByGiftCardService.class); 26     statisticsService=SpringContext.getBean(StatisticsService.class); 27     writeRedisService=SpringContext.getBean(WriteRedisService.class); 28     readRedisService=SpringContext.getBean(ReadRedisService.class); 29     erpOrderGoodsService=SpringContext.getBean(ErpOrderGoodsService.class); 30     erpOrderService=SpringContext.getBean(ErpOrderService.class); 31   } 32  33   @Override 34   public void run() { 35     logger.info("CalculateAmount task run start........orderId:"+orderId); 36     StopWatch watch=new StopWatch(); 37     watch.start(); 38     /** 39      * 取出订单相关的所有数据同步到统计的库中 40     */ 41     //TODO 考虑要不要将下面所有操作放到一个事务里面 42     List<Map<String, Object>> orders = this.orderServices.selectOrderById(orderId); 43     if(orders!=null&&orders.size()>0){ 44       Map<String, Object> order = orders.get(0); 45        46       String orderSN=U.nvl(order.get("OrderSN"));//订单编号 47       Integer userId=U.nvlInt(order.get("usr_UserID"),null);//用户d 48       Integer status=U.nvlInt(order.get("Status"),null);//状态 49       Date createTime=now;//(Date)order.get("CreateTime");//创建时间 50       Date modifyTime=now;//(Date)order.get("ModifyTime");// 更新时间 51       BigDecimal discountPrice=U.nvlDecimal(order.get("DiscountPrice"),null);//优惠总额 满减金额 52       BigDecimal payPrice=U.nvlDecimal(order.get("PayPrice"), null);//实付金额 53       BigDecimal totalPrice=U.nvlDecimal(order.get("TotalPrice"), null);//总金额 54        55       //从erp里查询出订单的确认时间 56       int dbConfirmTime=0; 57       try { 58         dbConfirmTime = this.erpOrderService.selectConfirmTimeByOrderId(orderId); 59       } catch (Exception e2) { 60         logger.error("",e2); 61       } 62       Date ct=new Date(dbConfirmTime*1000L); 63        64       int[] dates=U.getYearMonthDayHour(ct);// 65       if(modifyTime!=null){ 66         dates=U.getYearMonthDayHour(modifyTime);// 67       } 68       int year=dates[0];//年 69       int month=dates[1];//月 70       int day=dates[2];//日 71       int hour=dates[3];//小时 72        73       String ordersId=orderId+"";//生成订单id 74        75       //查询订单的来源和搜索引擎关键字 76       String source=""; 77       String seKeyWords=""; 78       List<OrdersData> orderDataList=this.statisticsService.selectOrdersDataByOrdersId(orderSN); 79       if(orderDataList!=null&&!orderDataList.isEmpty()){ 80         OrdersData ordersData = orderDataList.get(0); 81         source=ordersData.getSource(); 82         seKeyWords=ordersData.getSeKeyWords(); 83       } 84        85       //TODO 将订单入库 86       ArrayList<RelOrders> relOrdersList = Lists.newArrayList(); 87       RelOrders relOrders=new RelOrders(orderSN,userId+"",Byte.valueOf(status+""),source,seKeyWords,IsCal.未计算.getFlag(),(byte)U.getSimpleYearByYear(year),(byte)month,(byte)day,(byte)hour,ct,createTime,modifyTime); 88       relOrdersList.add(relOrders); 89        90       try { 91         relOrders.setConfirmTime(ct); 92         //查询RelOrders是否存在 93         RelOrders dbOrders=this.statisticsService.selectByPrimaryKey(orderSN); 94         if(dbOrders!=null){ 95           //更新 96           dbOrders.setStatus(Byte.valueOf(status+"")); 97           dbOrders.setConfirmTime(ct); 98           dbOrders.setModifyTime(modifyTime); 99           this.statisticsService.updateByPrimaryKeySelective(dbOrders);100           return;101         }else{102           Integer relResult=this.statisticsService.insertRelOrdersBatch(relOrdersList);103         }104       } catch (Exception e) {105         logger.error("insertRelOrdersBatch error",e);106       }107       /**108        * 查这个订单的返现、优惠券、礼品卡 的金额109       */110       List<Map<String, Object>> cashs = this.ordPayByCashbackService.selectDecutionPriceByOrderId(orderId);111       List<Map<String, Object>> coupons = this.ordPayByCouponService.selectDecutionPriceByOrderId(orderId);112       113       BigDecimal cashAmount=U.getValueByKey(cashs, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//返现金额114       BigDecimal couponAmont=U.getValueByKey(coupons, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//红包金额115       /**116        * 查询出这个订单的所有商品117       */118       List<Map<String, Object>> products=null;119       Map<String,Object> productToKeyWordMap=Maps.newHashMap();120       try {121         products = this.ordHaveProductService.selectByOrderId(orderId);122         List<OrdersItemData> ordersItemDataList=this.statisticsService.selectOrdersItemDataByOrdersId(orderSN);123         if(ordersItemDataList!=null){124           for (OrdersItemData ordersItemData : ordersItemDataList) {125             productToKeyWordMap.put(ordersItemData.getItemId(), ordersItemData.getKeyWords());126           }127         }128       } catch (Exception e1) {129         logger.error("",e1);130       }131       if(products!=null){132         ArrayList<RelOrdersItem> relOrdersItemList = Lists.newArrayList();133         for (Map<String, Object> product : products) {134           Integer productId=U.nvlInt(product.get("pro_ProductID"), null);//商品Id135           Integer buyNo=U.nvlInt(product.get("BuyNo"), 0);//购买数量136           String SN=U.nvl(product.get("SN"),"");137           BigDecimal buyPrice=U.nvlDecimal(product.get("BuyPrice"), BigDecimal.ZERO);//购买价格138           BigDecimal buyTotalPrice=U.nvlDecimal(product.get("BuyTotalPrice"), null);//购买总价格139           BigDecimal productPayPrice=U.nvlDecimal(product.get("PayPrice"), null);//单品实付金额140           141           BigDecimal cost=null;//商品成本 TODO 调别人的接口142           BigDecimal realtimeAmount=null;//实付金额143           144           BigDecimal pdCashAmount=BigDecimal.ZERO;//每个商品的返现145           BigDecimal pdcouponAmont=BigDecimal.ZERO;//每个商品的优惠券146           147           //商品价格所占订单比例148           if(buyTotalPrice!=null&&totalPrice!=null&&totalPrice.doubleValue()!=0){149             pdCashAmount=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(cashAmount).setScale(2,BigDecimal.ROUND_HALF_UP);150             pdcouponAmont=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(couponAmont).setScale(2,BigDecimal.ROUND_HALF_UP);151             discountPrice=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(discountPrice).setScale(2,BigDecimal.ROUND_HALF_UP);152           }153           154           realtimeAmount=buyTotalPrice.subtract((pdCashAmount.add(pdcouponAmont).add(discountPrice))).setScale(2,BigDecimal.ROUND_HALF_UP);155           156           RelOrdersItem item=new RelOrdersItem(U.randomUUID(),orderSN,productId,SN,buyNo,realtimeAmount,U.nvl(productToKeyWordMap.get(productId)));157           158           relOrdersItemList.add(item);159           160           //如果确认时间属于同一天的话,将商品实付金额放入到redis排行榜中161           if((status==1||status==5||status==6||status==7||status==11)&&DateUtils.isSameDay(new Date(), ct)){162             //如果订单的状态是这几种,刚将该商品加入到实付金额的排行 榜中163             dates=U.getYearMonthDayHour(ct);//164             int days=dates[2];165             //某一个商品某一天的实付金额166             BigDecimal itemRelAmount=BigDecimal.ZERO;167             //从redis里取出这个商品的实付金额,然后累加168             String itemRelAmountStr=readRedisService.get(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);169             if(StringUtils.isNotBlank(itemRelAmountStr)){170               itemRelAmount=new BigDecimal(itemRelAmountStr);171             }172             realtimeAmount=itemRelAmount.add(realtimeAmount);173             writeRedisService.set(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days, realtimeAmount.toPlainString());174             writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);175             writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTSS_KEY+days, realtimeAmount.doubleValue(), productId+"");176             //确认的销量177             Long itemCount= writeRedisService.incrBy(Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days,buyNo);178             writeRedisService.zadd(Constans.CACHE_ITEMSALES_SS_KEY_PRDFIX+days, itemCount, productId+"");179             180             String itemType="";181             Map<String, String> pMap = this.readRedisService.hmget(Constans.CACHE_PRODUCT_KEY+productId);182             itemType=pMap.get("categoryId");183             if(StringUtils.isNotBlank(itemType)){184               if(ProductCategory.isGuanBai(itemType)){185                 //如果是白酒 官白的访客数排行 186                 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTWHITESS_KEY+days, realtimeAmount.doubleValue(), productId+"");//187                 //确认的销量排行188                 this.writeRedisService.zadd(Constans.CACHE_ITEMSALESWHITE_SS_KEY_PRDFIX+days, itemCount, productId+"");//189               }else if(ProductCategory.isGuanHong(itemType)){190                 //官红的访客数排行 191                 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTREDSS_KEY+days, realtimeAmount.doubleValue(), productId+"");//192                 //确认的销量排行193                 this.writeRedisService.zadd(Constans.CACHE_ITEMSALESRED_SS_KEY_PRDFIX+days, itemCount, productId+"");//194               }195             }196             197             //某一个商品的销量加入删除列表198             writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);199           }200         }201         try {202           //TODO 将订单商品明细入库203           this.statisticsService.insertRelOrdersItemBatch(relOrdersItemList);204           //再将订单的状态改为已计算205           this.statisticsService.updateIsCal(orderSN,IsCal.已计算.getFlag());//将是否计算改成已计算206           //该订单的所有商品的成本同步到现在的库中。207           this.calOrderProductCostSync(orderId,orderSN,products);208         } catch (Exception e) {209           logger.error("insertRelOrdersItemBatch or updateIsCal error",e);210         }211       }212     }213     watch.stop();214     logger.info("CalculateAmount task run end........total cost time:"+watch.getTime()+"  orderId:"+orderId);215   }216  217   private void calOrderProductCostSync(int orderId,String orderSN,List<Map<String, Object>> products){218     List<Map<String, Object>> ordersList = this.erpOrderGoodsService.selectProductCostByOrderSN(orderSN);219     if(ordersList==null||ordersList.isEmpty()){220       logger.error("according orderId to query some data from erp return is null.........");221       return;222     }223     Map<String, String> itemIdToItemSnMap = U.convertToMapByList(products, "pro_ProductID", "SN");224     225     List<RelItemCosts> list=Lists.newArrayList();226     for (Map<String, Object> map : ordersList) {227       RelItemCosts itemCost=new RelItemCosts();228       if(map==null){229         continue;230       }231       Integer itemId=U.nvlInt(map.get("goods_id"),-99);232       BigDecimal costs=U.nvlDecimal(map.get("Dynamic_price"), BigDecimal.ZERO);233       itemCost.setId(U.randomUUID());234       itemCost.setOrdersId(orderId+"");235       itemCost.setOrdersNo(orderSN);236       itemCost.setItemId(itemId);237       itemCost.setItemNo(itemIdToItemSnMap.get(itemId+""));238       itemCost.setCosts(costs);239       itemCost.setCreateTime(new Date());240       itemCost.setModifyTime(new Date());241       list.add(itemCost);242     }243     244     this.statisticsService.insertRelItemCostsBatch(list);245     246   }247   248 }

 

  注意:

    1、redis2.6版本使用lpush、rpop出列的时候会丢失数据。换成2.8及以上的版本运行正常。

    2、由于应用会部署到多个结点,所以无法直接采用java的BlockingQueue阻塞队列,帮采用redis提供的队列支持。

    3、如果要做到统计的绝对实时,最好采用大数据的实时计算的解决方案:kafka+storm 来实现

  以上为队列结合线程的实践案例,供大家一起探讨。

    转载请注明出处 ,请大家尊重作者的劳动成果。