package com.yutu.base.service; import com.alibaba.fastjson.JSONObject; import com.yutu.base.entity.Response; import com.yutu.base.utils.HttpsSendData; import org.apache.log4j.Logger; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; /** * 静态消息队列包含了异步响应的发送地址和发送内容 */ @Component public class MessageQueueController implements CommandLineRunner { private static Logger logger = Logger.getLogger("messagequeue"); //响应消息队列,元素 Map<响应url,响应对象> public static BlockingQueue<Map<String, Response>> responseMessageQueue = new LinkedBlockingDeque<>(); //提供入队列方法 public static void setResponseMessageQueue(String responseUrl,Response response) { Map<String,Response> responseMap = new HashMap<>(); responseMap.put(responseUrl,response); try{ responseMessageQueue.put(responseMap); }catch (Exception e){ logger.error("响应消息队列,入队列异常,参数:"+responseUrl+","+response); } } @Override public void run(String... args) throws Exception { //启动即加载本任务 logger.info("启动程序即开始消费响应消息队列"); consumeResponseMessageQueue(); } //消费响应消息队列 private void consumeResponseMessageQueue(){ ExecutorService threadPool = Executors.newFixedThreadPool(1); Runnable consumeTask = new Runnable() { @Override public void run() { while (true){ logger.info("响应消息队列当前长度:"+responseMessageQueue.size()); Map<String,Response> map = null; try { map = responseMessageQueue.take(); for(Map.Entry entry:map.entrySet()){ String url = String.valueOf(entry.getKey()); Response response = (Response)entry.getValue(); logger.info("响应消息队列,获取和执行发送任务,参数:responseUrl="+url+",Response="+response); sendResponse(url,response); } }catch (Exception e) { logger.error("响应消息队列执行任务时异常,参数"+map); } } } }; threadPool.execute(consumeTask); } //发送响应消息 private void sendResponse(String url,Response response){ try{ HttpsSendData.getPost(url, JSONObject.toJSONString(response)); }catch (Exception e){ logger.error("发送异步响应消息异常",e); } } }