一,HTTP解码器可能会将一个HTTP请求解析成多个消息对象。
ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ParseRequestHandler());
经过HttpServerCodec解码之后,一个HTTP请求会导致:ParseRequestHandler的 channelRead()方法调用多次(测试时 "received message"输出了两次)
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("received message");
可以用HttpObjectAggregator 将多个消息转换为单一的一个FullHttpRequest,如下:
ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536)); ch.pipeline().addLast(new ParseRequestHandler());
此时,一个HTTP消息(Object msg)是下面这样的。
HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 17, cap: 17, components=1))POST / HTTP/1.1Host: 127.0.0.1:8888User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8Accept-Language: nullAccept-Encoding: gzip, deflateContent-Type: application/x-www-form-urlencodedContent-Length: 17Cookie: _ga=GA1.1.457486782.1446782739Connection: keep-alive
从上面可以看出,实体首部字段Content-Length是17,表明实体主体有17个字节。
而我发送的消息是这样的:
HTTP POST 请求,请求体是JSON格式的数据。这里使用的是json-lib解析的 Json字符串。代码如下:
//parse job type 0,1 private String getJobType(FullHttpRequest request) throws IOException{ ByteBuf jsonBuf = request.content(); String jsonStr = jsonBuf.toString(CharsetUtil.UTF_8); JSONObject jsonObj = JSONObject.fromObject(jsonStr); String jobType = jsonObj.getString("jobType"); return jobType; }
需要注意是:使用json-lib解析Json字符串时,需要其他的依赖包如下:
解析完成之后,需要把处理后的结果发送到下一个ChannelHandler,进行下一步的处理。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //do some process ..... ctx.fireChannelRead(job);}
注意,使用的是fireChannelRead()方法,而不是 ctx.writeAndFlush(...)。因为,writeAndFlush/write 是Outbound,它是把消息发送到上一个Handler,进而发送到remote peer,而这里是InBound。
这里,通过 ctx.fireChannelRead(job); 将处理后的结果发送到下一个Channel处理。
ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(2048)); ch.pipeline().addLast(new ParseRequestHandler()); ch.pipeline().addLast(new OozieRequestHandler());
下一个Handler是OozieRequestHandler,它负责向Oozie Server提交作业,之后返回jobId给客户端(HttpServerCodec Handler 负责底层传输细节)。
Netty构造一个http 响应的方法如下:
String jobId = doPost(jobConfig);FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(jobId.getBytes()));response.headers().set(CONTENT_TYPE, "application/xml"); response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes()); ctx.write(response).addListener(ChannelFutureListener.CLOSE);
整个完整代码可参考:https://github.com/hapjin/netty_schedule