博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java的网络工具netty简介
阅读量:6440 次
发布时间:2019-06-23

本文共 7093 字,大约阅读时间需要 23 分钟。

hot3.png

java的网络工具netty简介

         Netty是一个NIO的客服端服务器框架,它可以简单、快速的搭建器一个协议包客服端服务器的应用程序。它极大的简化了TCP和UDP这类的网络编程。

   “快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

        这里简单记录下学习要点,详细的讲解。可以看官网(github: )或者查看李林锋的的系列文章 。

        体系结构图:

114323_dxUC_2246410.png

        由李林锋讲解的易懂的架构图:

114530_SsI8_2246410.jpg

    1、两个selector线程:mainReactor处理accpet事件、subReactor处理connection、read、send事件

    2、业务处理线程池:包括编码、解码、业务处理。

1、官网案例

    a、处理bytes的serverhandler

/** * @see 进入的channel:用于处理接受时候的事件处理 */public class TimeServerHandler extends ChannelInboundHandlerAdapter {	/**	 * @see 当一个channel准备好的时候,发送一个32位的数字	 */	public void channelActive(final ChannelHandlerContext ctx) {		// ByteBuf:没有了flip()。它只有2个功能:读、写		// 读:		// 写:当你写的时候,如果读取下标没有改变,则继续增长		final ByteBuf time = ctx.alloc().buffer(4);		time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));		final ChannelFuture f = ctx.writeAndFlush(time);		// 当写如完成的时候,执行		f.addListener(new ChannelFutureListener() {			public void operationComplete(ChannelFuture future) throws Exception {				// TODO Auto-generated method stub				assert f == future;				ctx.close();			}		});	}	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {		cause.printStackTrace();		ctx.close();	}}

    sever的启动部分

public class TimeServer {	private int port;	public TimeServer() {		this.port = port;	}	public void runn() throws Exception {		EventLoopGroup bossGroup = new NioEventLoopGroup();		EventLoopGroup workerGroup = new NioEventLoopGroup();		try {			ServerBootstrap b = new ServerBootstrap();			b.group(bossGroup, workerGroup);			b.channel(NioServerSocketChannel.class);			b.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub ch.pipeline().addLast(new TimeServerHandler()); } }); b.option(ChannelOption.SO_BACKLOG, 128); b.childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { try { new TimeServer().runn(); } catch (Exception e) { e.printStackTrace(); } }}

    b、client部分:处理字节

public class TimeDecoder extends ByteToMessageDecoder {	/**	 * @see 定义一个回调的数据累加buff	 * @see 如果有out,则表示解析成功。	 */	@Override	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {		if (in.readableBytes() < 4) {			return;		}		out.add(in.readBytes(4));	}}

   client的 channel处理类

public class TimeClientHandler extends ChannelInboundHandlerAdapter {	public void channelRead(ChannelHandlerContext ctx, Object msg) {		ByteBuf buf = (ByteBuf) msg;		try {			long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;			System.out.println(new Date(currentTimeMillis));			ctx.close();		} catch (Exception e) {			e.printStackTrace();		} finally {			buf.release();		}	}	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {		cause.printStackTrace();		ctx.close();	}}

    client的启动类:

public class TimeClient {	public static void main(String[] args) throws InterruptedException {		String host = args[0];		int port = Integer.parseInt(args[1]);		EventLoopGroup workerGroup = new NioEventLoopGroup();		try {			Bootstrap b = new Bootstrap();// 启动客服端连接			b.group(workerGroup);// 同时用于主线程和工作线程			b.channel(NioSocketChannel.class);// 客服端需要的channel			b.option(ChannelOption.SO_KEEPALIVE, true); // socketChannel没有父类						b.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub ch.pipeline().addLast(new TimeClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } }}

2、stream处理

    小型buffer的socket传送流传输依据TCP/IP,接受的数据是储存在一个接受的socket的buffer中。但是,传送的buffer不是一个队列包、而是一个队列btyes。这就意味着,即使你使用两个包去传送两端信息,系统不会将它们视为两端信息,而是作为一串bytes。因此,这不能保证你读去的数据是你远程写入的数据。例如,我们需要使用系统的TCP/IP栈接受到3个数据包。

     151401_woxN_2246410.png

因为根据流协议,你很有可能在你的应用中读取到你下面的部分

151415_hfeU_2246410.png

因次,在服务器和客服端的接受部分,对接受数据必须定义一个协议的框架(处理方式),这个框架能够被应用程序使用。接收到的部分必须是下面这种方式。

151401_woxN_2246410.png

    a、第一种解决方式:

        在TIME client的实例中。我们同样是有一个相似的问题,一个非常小的32位bit的整数数据,它不太可能分散。然而,随着流量的增加,问题是它会碎片化。

        简单的解决方式,增加一个内部的累加buffer,然后将接受的4bytes传输到这个buffer中。在TimeClientHandler直接修改

public class TimeClientHandler2 extends ChannelInboundHandlerAdapter {	private ByteBuf buf;	@Override	public void handlerAdded(ChannelHandlerContext ctx) {		buf = ctx.alloc().buffer(4);	}	@Override	public void handlerRemoved(ChannelHandlerContext ctx) {		buf.release();		buf = null;	}	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg) {		ByteBuf m = (ByteBuf) msg;		buf.writeBytes(m);		m.release();		if (buf.readableBytes() >= 4) {			long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;			System.out.println(new Date(currentTimeMillis));			ctx.close();		}	}	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {		cause.printStackTrace();		ctx.close();	}}

    b、第二种就是前面的实例方式、将decode分离出来处理。看起来清晰、方便

3、编解object数据

    object

public class UnixTime {	private final long value;	public UnixTime() {		this(System.currentTimeMillis() / 1000L + 2208988800L);	}	public UnixTime(long value) {		this.value = value;	}	public long value() {		return this.value;	}	public String toString() {		return new Date((value() - 2208988800L) * 1000L).toString();	}}

    object:decode

public class TimeDecoder2 extends ByteToMessageDecoder {	/**	 * @see 定义一个回调的数据累加buff	 * @see 如果有out,则表示解析成功。	 */	@Override	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {		if (in.readableBytes() < 4) {			return;		}		out.add(new UnixTime(in.readUnsignedInt()));	}}

    objec:encode

public class TimeEncoder extends ChannelOutboundHandlerAdapter {	public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {		UnixTime m = (UnixTime) msg;		ByteBuf encoded = ctx.alloc().buffer(4);		encoded.writeInt((int) m.value());		ctx.write(encoded, promise);	}

   

    server:handler

public class TimeServerHandler2 extends ChannelInboundHandlerAdapter {	/**	 * @see 当一个channel准备好的时候,发送一个32位的数字	 */	public void channelActive(final ChannelHandlerContext ctx) {		// ByteBuf:没有了flip()。它只有2个功能:读、写		// 读:		// 写:当你写的时候,如果读取下标没有改变,则继续增长		final ByteBuf time = ctx.alloc().buffer(4);		time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));		final ChannelFuture f = ctx.writeAndFlush(new UnixTime());		// 当写如完成的时候,执行		f.addListener(ChannelFutureListener.CLOSE);	}	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {		cause.printStackTrace();		ctx.close();	}}

 

client:handler

public class TimeClientHandler3 extends ChannelInboundHandlerAdapter {	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg) {		UnixTime m = (UnixTime) msg;		System.out.println(m);		ctx.close();	}	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {		cause.printStackTrace();		ctx.close();	}}

转载于:https://my.oschina.net/u/2246410/blog/652313

你可能感兴趣的文章
GridView如何设置View的初始样式
查看>>
Placeholder in IE8 and older
查看>>
SQL语句字符串处理大全
查看>>
环境变量的作用,为什么要设置环境变量?
查看>>
从尾到头打印单链表
查看>>
getopt
查看>>
我的第一个IT产品:PublicLecture@HK【My First IT Product】
查看>>
优秀员工与普通员工
查看>>
CCNP学习笔记15-RSTP
查看>>
DELL服务器iDRAC相关设置
查看>>
JVM学习笔记(一)------基本结构
查看>>
$@等特定shell变量的含义
查看>>
我的友情链接
查看>>
(超详细版)Linux下Hadoop2.7.1集群环境的搭建(3台为例)
查看>>
策略模式、上下文与内部类的思考
查看>>
关于getCurrentUrl的获取问题
查看>>
2014年工作中遇到的20个问题:120-140
查看>>
解决win10不能安装NVIDIA的RTX 20系列的显卡驱动问题
查看>>
pdf如何解密
查看>>
×××S 2012 聚合函数 -- 介绍
查看>>