你的位置:首页 > Java教程

[Java教程]第五章 服务熔断(hystrix)+ retrofit底层通信(AsyncHttpclient)


 

一、集群容错

技术选型:hystrix。(就是上图中熔断器

熔断的作用

第一个作用:

假设有两台服务器server1(假设可以处理的请求阈值是1W请求)和server2,在server1上注册了三个服务service1、service2、service3,在server2上注册了一个服务service4,假设service4服务响应缓慢,service1调用service4时,一直在等待响应,那么在高并发下,很快的server1处很快就会达到请求阈值(server1很快就会耗尽处理线程)之后可能宕机,这时候,不只是service1不再可用,server1上的service2和service3也不可用了。

如果我们引入了hystrix,那么service1调用service4的时候,当发现service4超时,立即断掉不再执行,执行getFallback逻辑。这样的话,server1就不会耗尽处理线程,server1上的其他服务也是可用的。当然,这是在合理的配置了超时时间的情况下,如果超时时间设置的太长的话,还是会出现未引入hystrix之前的情况。

第二个作用:

当被调服务经常失败,比如说在10min(可配)中之内调用了20次,失败了15次(可配),那么我们认为这个服务是失败的,先关闭该服务,等一会儿后再自动重新启动该服务!(这是真正的熔断!)

 

二、实现

1、framework

1.1、pom.

 1     <!-- converter-jackson --> 2     <dependency> 3       <groupId>com.squareup.retrofit</groupId> 4       <artifactId>converter-jackson</artifactId> 5       <version>1.9.0</version> 6     </dependency> 7     <!-- async-http-client --> 8     <dependency> 9       <groupId>com.ning</groupId>10       <artifactId>async-http-client</artifactId>11       <version>1.9.31</version>12     </dependency>13 14     <!-- hystrix -->15     <dependency>16       <groupId>com.netflix.hystrix</groupId>17       <artifactId>hystrix-core</artifactId>18       <version>1.5.3</version>19     </dependency>20     <dependency>21       <groupId>com.netflix.hystrix</groupId>22       <artifactId>hystrix-metrics-event-stream</artifactId>23       <version>1.5.3</version>24     </dependency>

View Code

说明:

  • 添加retrofit的Jackson转换器,默认为GSON
  • 添加AsyncHttpClient
  • 添加hystrix及其metrics包(后者用于展示hystrix的图表信息,以后会在优化部分完成)

1.2、服务通信(retrofit)+集群容错(hystrix)

1.2.1、RestAdapterConfig

 1 package com.microservice.retrofit; 2  3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Component; 5  6 import com.microservice.loadBalancer.MyLoadBalancer; 7 import com.microservice.loadBalancer.ServerAddress; 8  9 import retrofit.RestAdapter;10 import retrofit.converter.JacksonConverter;11 12 @Component13 public class RestAdapterConfig {14 15   @Autowired16   private MyLoadBalancer myLoadBalancer;17 18   /**19    * 负载均衡并且创建传入的API接口实例20   */21   public <T> T create(Class<T> tclass, String serviceName) {22     String commandGroupKey = tclass.getSimpleName();// 获得简单类名作为groupKey23 24     ServerAddress server = myLoadBalancer.chooseServer(serviceName);// 负载均衡25     RestAdapter restAdapter = new RestAdapter.Builder()26                  .setConverter(new JacksonConverter())27                  .setErrorHandler(new MyErrorHandler())28                  .setClient(new MyHttpClient(server, commandGroupKey))29                  .setEndpoint("/").build();30     T tclassInstance = restAdapter.create(tclass);31     return tclassInstance;32   }33 }

View Code

说明:这里我们定义了自己的retrofit.Client和自己的retrofit.ErrorHandler

1.2.2、MyHttpClient(自定义retrofit的Client)

 1 package com.microservice.retrofit; 2  3 import java.io.IOException; 4  5 import com.microservice.hystrix.HttpHystrixCommand; 6 import com.microservice.loadBalancer.ServerAddress; 7 import com.netflix.hystrix.HystrixCommand.Setter; 8 import com.netflix.hystrix.HystrixCommandGroupKey; 9 10 import retrofit.client.Client;11 import retrofit.client.Request;12 import retrofit.client.Response;13 14 public class MyHttpClient implements Client {15   private ServerAddress server;16   private String    commandGroupKey;17 18   public MyHttpClient(ServerAddress server, String commandGroupKey) {19     this.server = server;20     this.commandGroupKey = commandGroupKey;21   }22 23   @Override24   public Response execute(Request request) throws IOException {25     Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));26     return new HttpHystrixCommand(setter, server, request).execute();// 同步执行27   }28 }

View Code

说明:在execute()中引入了hystrix

  • 定义了hystrix的commandGroupKey是服务名(eg.myserviceA,被调用服务名
  • 没有定义commandKey(通常commandKey是服务的一个方法名,例如myserviceA的client的getProvinceByCityName),通常该方法名是被调用服务的client中的被调用方法名
  • 手动设置hystrix的属性
    • setter.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000));
    • 实际上,直接配置在consul上就好了,根据上一节archaius的自动拉取配置,hystrix会自动从pollResult中取配置并设置到实例中去。
  • 查看hystrix的属性
    • command.getProperties().executionTimeoutInMilliseconds().get(),这里的command就是下边的HttpHystrixCommand实例

1.2.3、HttpHystrixCommand(hystrix核心类)

 1 package com.microservice.hystrix; 2  3 import java.io.ByteArrayOutputStream; 4 import java.io.IOException; 5 import java.util.ArrayList; 6 import java.util.List; 7 import java.util.concurrent.Future; 8  9 import org.apache.commons.lang3.StringUtils; 10 import org.apache.tomcat.util.http.fileupload.IOUtils; 11 import org.slf4j.Logger; 12 import org.slf4j.LoggerFactory; 13  14 import com.microservice.loadBalancer.ServerAddress; 15 import com.netflix.hystrix.HystrixCommand; 16 import com.ning.http.client.AsyncHttpClient; 17 import com.ning.http.client.FluentCaseInsensitiveStringsMap; 18 import com.ning.http.client.RequestBuilder; 19  20 import retrofit.client.Header; 21 import retrofit.client.Request; 22 import retrofit.client.Response; 23 import retrofit.mime.TypedByteArray; 24 import retrofit.mime.TypedOutput; 25  26 public class HttpHystrixCommand extends HystrixCommand<Response> { 27   private static final Logger LOGGER = LoggerFactory.getLogger(HttpHystrixCommand.class); 28  29   private ServerAddress    server; 30   private Request       request; 31   private String       requestUrl; 32   private AsyncHttpClient   asyncHttpClient; 33  34   public HttpHystrixCommand(Setter setter, ServerAddress server, Request request) { 35     super(setter); 36     this.server = server; 37     this.request = request; 38  39     //    AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder().setRequestTimeout(5000);//5s 40     //    this.asyncHttpClient = new AsyncHttpClient(builder.build()); 41     this.asyncHttpClient = new AsyncHttpClient(); 42   } 43  44   @Override 45   public Response run() throws Exception { 46     com.ning.http.client.Request asyncReq = retroReq2asyncReq(request, server); 47     Future<com.ning.http.client.Response> asyncResFuture = asyncHttpClient.executeRequest(asyncReq); 48     com.ning.http.client.Response asyncRes = asyncResFuture.get(); 49     return asynRes2RetroRes(asyncRes); 50   } 51  52   /** 53    * 1、设置方法请求类型,例如:GET/POST 54    * 2、转换请求头header(包括mime。这个需要根据请求体的情况进行掌握) 55    * 3、转换请求体 56    * 4、设置请求URL 57   */ 58   public com.ning.http.client.Request retroReq2asyncReq(Request request, ServerAddress server) { 59     RequestBuilder requestBuilder = new RequestBuilder(request.getMethod());//传入方法请求类型,例如:GET/POST 60     List<Header> headers = request.getHeaders(); 61     headers.forEach(x -> requestBuilder.addHeader(x.getName(), x.getValue())); 62  63     if (request.getBody() != null) { 64       String mimeType = StringUtils.EMPTY; 65       if (StringUtils.isNotEmpty(mimeType)) { 66         requestBuilder.addHeader("Content-Type", mimeType); 67       } else { 68         requestBuilder.addHeader("Content-Type", "application/json"); 69       } 70  71       TypedOutput body = request.getBody(); 72       ByteArrayOutputStream outPutStream = new ByteArrayOutputStream(); 73       try { 74         body.writeTo(outPutStream);//将body内容写入到ByteArrayOutputStream里 75         requestBuilder.setBody(outPutStream.toByteArray()); 76       } catch (IOException e) { 77         e.printStackTrace(); 78       } finally { 79         IOUtils.closeQuietly(outPutStream); 80       } 81     } 82     String url = new StringBuilder("http://").append(server.getIp()) 83                         .append(":") 84                          .append(server.getPort()) 85                         .append("/") 86                          .append(request.getUrl()).toString(); 87     requestUrl = url; 88     requestBuilder.setUrl(url); 89     return requestBuilder.build(); 90   } 91  92   public Response asynRes2RetroRes(com.ning.http.client.Response asyncRes) throws IOException { 93     return new Response(asyncRes.getUri().toUrl(),  94               asyncRes.getStatusCode(),  95               asyncRes.getStatusText(), 96               getHeaders(asyncRes.getHeaders()), 97               new TypedByteArray(asyncRes.getContentType(), asyncRes.getResponseBodyAsBytes())); 98   } 99 100   private List<Header> getHeaders(FluentCaseInsensitiveStringsMap asyncHeaders) {101     List<Header> retrofitHeaders = new ArrayList<>();102     asyncHeaders.keySet().forEach(key -> retrofitHeaders.add(new Header(key, asyncHeaders.getFirstValue(key))));103     return retrofitHeaders;104   }105 106   /**107    * 超时后的一些操作,或者如果缓存中有信息,可以从缓存中拿一些,具体的要看业务,也可以打一些logger108   */109   @Override110   public Response getFallback() {111     LOGGER.error("请求超时了!requestUrl:'{}'", requestUrl);112     /**113      * 想要让自定义的ErrorHandler起作用以及下边的404和reason有意义,就一定要配置requestUrl和List<header>114      * 其实这里可以看做是定义自定义异常的状态码和状态描述115      * 其中状态码用于自定义异常中的判断(见HystrixRuntimeException)116     */117     return new Response(requestUrl, 404, //定义状态码118       "execute getFallback because execution timeout", //定义消息 119       new ArrayList<Header>(), null);120   }121 }

View Code

说明:首先调用run(),run()失败或超时候调用getFallback()

  • run()--这里是一个定制口,我使用了AsyncHttpClient,还可以使用其他的网络调用工具,例如:okhttp
    • 首先将Retrofit的请求信息Request转化为AsyncHttpClient的Request(在这里调用了负载均衡,将请求负载到选出的一台机器)
    • 之后调用AsyncHttpClient来进行真正的http调用,并返回AsyncHttpClient型的相应Response
    • 最后将AsyncHttpClient型的响应Response转换为Retrofit型的Response
  • getFallback()
    • 直接抛异常是不行的(该接口不让),只能采取以下的方式
    • 返回一个Response对象,该对象封装了status是404+错误的原因reason+请求的url+相应的Header列表+响应体(这里的status和reason会被用在ErrorHandler中去用于指定执行不同的逻辑,具体看下边的MyErrorHandler)
    • 如果想让MyErrorHandler起作用,Response对象必须有"请求的url+相应的Header列表",其中Header列表可以使一个空List实现类,但是不可为null
  • 在构建AsyncHttpClient实例时可以设置相关的http参数,例如:注释部分的设置请求超时时间。
    • 值得注意的是我们在配置请求超时时间时,要结合hystrix的超时时间来设置,程序会以二者的最小值作为请求超时时间

1.2.4、MyErrorHandler(自定义retrofit的错误处理器)

 1 package com.microservice.retrofit; 2  3 import com.microservice.exception.HystrixRuntimeException; 4  5 import retrofit.ErrorHandler; 6 import retrofit.RetrofitError; 7 import retrofit.client.Response; 8  9 public class MyErrorHandler implements ErrorHandler{10   @Override11   public Throwable handleError(RetrofitError cause) {12     Response response = cause.getResponse();13     /**14      * 这里是一个可以定制的地方,自己可以定义所有想要捕获的异常15     */16     if(response!=null && response.getStatus()==404){17       return new HystrixRuntimeException(cause);18     }19     return cause;20   }21 }

View Code

说明:当发生了retrofit.error时(不只是上边的getFallback()返回的Response),我们可以在该ErrorHandler的handleError方法来进行相应Response的处理。这里我们指定当404时返回一个自定义异常。

1.2.5、HystrixRuntimeException(自定义异常)

 1 package com.microservice.exception; 2  3 /** 4  * 自定义异常 5 */ 6 public class HystrixRuntimeException extends RuntimeException { 7   private static final long serialVersionUID = 8252124808929848902L; 8  9   public HystrixRuntimeException(Throwable cause) {10     super(cause);//只有这样,才能将异常信息抛给客户端11   }12 }

View Code

说明:自定义异常只能通过super()来向客户端抛出自己指定的异常信息(上边的Response的reason,但是抛到客户端时还是一个500错误,因为run()错误或超时就是一个服务端错误)。

 

整个流程:

当myserviceB调用myserviceA的一个方法时,首先会执行自定义的MyHttpClient的execute()方法,在该execute()方法中我们执行了自定义的HttpHystrixCommand的execute()方法,此时就会执行执行HttpHystrixCommand的run()方法,如果该方法运行正常并在超时时间内返回数据,则调用结束。

如果run()方法调用失败或该方法超时,就会直接运行HttpHystrixCommand的getFallback()方法。该方法返回一个retrofit.Response对象,该对象的status是404,错误信息也是自定义的。之后该对象会被包装到RetrofitError对象中,之后RetrofitError对象会由MyErrorHandler的handleError()进行处理:从RetrofitError对象中先取出Response,之后根据该Response的status执行相应的操作,我们这里对404的情况定义了一个自定义异常HystrixRuntimeException。

 注意点:

  • retrofit的Response最好不要是null
  • retrofit的Jackson转换器无法转化单纯的String(因为Jackson转换器会将一个json串转化为json对象),这一点缺点可以看做没有,因为我们的接口都是restful的,那么我们都是使用json格式来通信的。 

 

三、配置与测试

1、配置

在consul上配置service/myserviceA/dev/config的配置内容和service/myserviceB/dev/config的内容。其中,myserviceB配置了hystrix的超时时间:

1 hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=1000

View Code

说明:关于hystrix的配置参数,查看http://www.cnblogs.com/java-zhao/p/5524584.html

2、测试

最后,启动consul,启动服务A和B,swagger测试就好了!!!(在测试过程中,可以动态的去改变consul中hystrix的超时时间值,来测试archaius的动态读取)