java active

java active,第1张

java active是什么,让我们一起了解一下?

在java中,Active Object模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。它的核心是允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。

首先从调用方代码来看,调用一个Active Object对象的方法与调用普通Java对象的方法并无太大差别。

1 ActiveObject ao=... 2 Future future = ao.doSomething("data") 3 //执行其它 *** 作 4 String result = future.get() 5 System.out.println(result)

那么Active Object模式的架构是什么,有什么作用?

1、当Active Object模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的 *** 作)、调用方代码所传递的参数等,会被封装成一个对象。

该对象被称为方法请求(Method Request)。方法请求对象会被存入Active Object模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的 *** 作。

也就是说,方法请求对象是由运行调用方代码的线程通过调用Active Object模式对外暴露的异步方法生成的,而方法请求所代表的 *** 作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。

2、Active Object模式的主要参与者有以下几种。

Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法doSomething时,该方法会生成一个相应的MethodRequest实例并将其存储到Scheduler所维护的缓冲区中。doSomething方法的返回值是一个表示其执行结果的外包装对象:Future参与者的实例。异步方法doSomething运行在调用方代码所在的线程中。

MethodRequest:负责将调用方代码对Proxy实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将Proxy的异步方法的调用和执行分离成为可能。其call方法会根据其所包含上下文信息调用Servant实例的相应方法。

ActivationQueue:负责临时存储由Proxy的异步方法被调用时所创建的MethodRequest实例的缓冲区。

Scheduler:负责将Proxy的异步方法所创建的MethodRequest实例存入其维护的缓冲区中。并根据一定的调度策略,对其维护的缓冲区中的MethodRequest实例进行执行。其调度策略可以根据实际需要来定,如FIFO、LIFO和根据MethodRequest中包含的信息所定的优先级等。

Servant:负责对Proxy所暴露的异步方法的具体实现。

Future:负责存储和返回Active Object异步方法的执行结果。

用异步输入输出流编写Socket进程通信程序

在Merlin中加入了用于实现异步输入输出机制的应用程序接口包:java.nio(新的输入输出包,定义了很多基本类型缓冲(Buffer)),java.nio.channels(通道及选择器等,用于异步输入输出),java.nio.charset(字符的编码解码)。通道(Channel)首先在选择器(Selector)中注册自己感兴趣的事件,当相应的事件发生时,选择器便通过选择键(SelectionKey)通知已注册的通道。然后通道将需要处理的信息,通过缓冲(Buffer)打包,编码/解码,完成输入输出控制。

通道介绍:

这里主要介绍ServerSocketChannel和 SocketChannel.它们都是可选择的(selectable)通道,分别可以工作在同步和异步两种方式下(注意,这里的可选择不是指可以选择两种工作方式,而是指可以有选择的注册自己感兴趣的事件)。可以用channel.configureBlocking(Boolean )来设置其工作方式。与以前版本的API相比较,ServerSocketChannel就相当于ServerSocket(ServerSocketChannel封装了ServerSocket),而SocketChannel就相当于Socket(SocketChannel封装了Socket)。当通道工作在同步方式时,编程方法与以前的基本相似,这里主要介绍异步工作方式。

所谓异步输入输出机制,是指在进行输入输出处理时,不必等到输入输出处理完毕才返回。所以异步的同义语是非阻塞(None Blocking)。在服务器端,ServerSocketChannel通过静态函数open()返回一个实例serverChl。然后该通道调用serverChl.socket().bind()绑定到服务器某端口,并调用register(Selector sel, SelectionKey.OP_ACCEPT)注册OP_ACCEPT事件到一个选择器中(ServerSocketChannel只可以注册OP_ACCEPT事件)。当有客户请求连接时,选择器就会通知该通道有客户连接请求,就可以进行相应的输入输出控制了;在客户端,clientChl实例注册自己感兴趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的组合),调用clientChl.connect(InetSocketAddress )连接服务器然后进行相应处理。注意,这里的连接是异步的,即会立即返回而继续执行后面的代码。

选择器和选择键介绍:

选择器(Selector)的作用是:将通道感兴趣的事件放入队列中,而不是马上提交给应用程序,等已注册的通道自己来请求处理这些事件。换句话说,就是选择器将会随时报告已经准备好了的通道,而且是按照先进先出的顺序。那么,选择器是通过什么来报告的呢?选择键(SelectionKey)。选择键的作用就是表明哪个通道已经做好了准备,准备干什么。你也许马上会想到,那一定是已注册的通道感兴趣的事件。不错,例如对于服务器端serverChl来说,可以调用key.isAcceptable()来通知serverChl有客户端连接请求。相应的函数还有:SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一个循环中轮询感兴趣的事件(具体可参照下面的代码)。如果选择器中尚无通道已注册事件发生,调用Selector.select()将阻塞,直到有事件发生为止。另外,可以调用selectNow()或者select(long timeout)。前者立即返回,没有事件时返回0值;后者等待timeout时间后返回。一个选择器最多可以同时被63个通道一起注册使用。

应用实例:

下面是用异步输入输出机制实现的客户/服务器实例程序――程序清单1(限于篇幅,只给出了服务器端实现,读者可以参照着实现客户端代码):

程序类图

public class NBlockingServer {

int port = 8000

int BUFFERSIZE = 1024

Selector selector = null

ServerSocketChannel serverChannel = null

HashMap clientChannelMap = null//用来存放每一个客户连接对应的套接字和通道

public NBlockingServer( int port ) {

this.clientChannelMap = new HashMap()

this.port = port

}

public void initialize() throws IOException {

//初始化,分别实例化一个选择器,一个服务器端可选择通道

this.selector = Selector.open()

this.serverChannel = ServerSocketChannel.open()

this.serverChannel.configureBlocking(false)

InetAddress localhost = InetAddress.getLocalHost()

InetSocketAddress isa = new InetSocketAddress(localhost, this.port )

this.serverChannel.socket().bind(isa)//将该套接字绑定到服务器某一可用端口

}

//结束时释放资源

public void finalize() throws IOException {

this.serverChannel.close()

this.selector.close()

}

//将读入字节缓冲的信息解码

public String decode( ByteBuffer byteBuffer ) throws

CharacterCodingException {

Charset charset = Charset.forName( "ISO-8859-1" )

CharsetDecoder decoder = charset.newDecoder()

CharBuffer charBuffer = decoder.decode( byteBuffer )

String result = charBuffer.toString()

return result

}

//监听端口,当通道准备好时进行相应 *** 作

public void portListening() throws IOException, InterruptedException {

//服务器端通道注册OP_ACCEPT事件

SelectionKey acceptKey =this.serverChannel.register( this.selector,

SelectionKey.OP_ACCEPT )

//当有已注册的事件发生时,select()返回值将大于0

while (acceptKey.selector().select() >0 ) {

System.out.println("event happened")

//取得所有已经准备好的所有选择键

Set readyKeys = this.selector.selectedKeys()

//使用迭代器对选择键进行轮询

Iterator i = readyKeys.iterator()

while (i

else if ( key.isReadable() ) {//如果是通道读准备好事件

System.out.println("Readable")

//取得选择键对应的通道和套接字

SelectableChannel nextReady =

(SelectableChannel) key.channel()

Socket socket = (Socket) key.attachment()

//处理该事件,处理方法已封装在类ClientChInstance中

this.readFromChannel( socket.getChannel(),

(ClientChInstance)

this.clientChannelMap.get( socket ) )

}

else if ( key.isWritable() ) {//如果是通道写准备好事件

System.out.println("writeable")

//取得套接字后处理,方法同上

Socket socket = (Socket) key.attachment()

SocketChannel channel = (SocketChannel)

socket.getChannel()

this.writeToChannel( channel,"This is from server!")

}

}

}

}

//对通道的写 *** 作

public void writeToChannel( SocketChannel channel, String message )

throws IOException {

ByteBuffer buf = ByteBuffer.wrap( message.getBytes() )

int nbytes = channel.write( buf )

}

//对通道的读 *** 作

public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )

throws IOException, InterruptedException {

ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE )

int nbytes = channel.read( byteBuffer )

byteBuffer.flip()

String result = this.decode( byteBuffer )

//当客户端发出”@exit”退出命令时,关闭其通道

if ( result.indexOf( "@exit" ) >= 0 ) {

channel.close()

}

else {

clientInstance.append( result.toString() )

//读入一行完毕,执行相应 *** 作

if ( result.indexOf( "\n" ) >= 0 ){

System.out.println("client input"+result)

clientInstance.execute()

}

}

}

//该类封装了怎样对客户端的通道进行 *** 作,具体实现可以通过重载execute()方法

public class ClientChInstance {

SocketChannel channel

StringBuffer buffer=new StringBuffer()

public ClientChInstance( SocketChannel channel ) {

this.channel = channel

}

public void execute() throws IOException {

String message = "This is response after reading from channel!"

writeToChannel( this.channel, message )

buffer = new StringBuffer()

}

//当一行没有结束时,将当前字窜置于缓冲尾

public void append( String values ) {

buffer.append( values )

}

}

//主程序

public static void main( String[] args ) {

NBlockingServer nbServer = new NBlockingServer(8000)

try {

nbServer.initialize()

} catch ( Exception e ) {

e.printStackTrace()

System.exit( -1 )

}

try {

nbServer.portListening()

}

catch ( Exception e ) {

e.printStackTrace()

}

}

}

程序清单1

小结:

从以上程序段可以看出,服务器端没有引入多余线程就完成了多客户的客户/服务器模式。该程序中使用了回调模式(CALLBACK)。需要注意的是,请不要将原来的输入输出包与新加入的输入输出包混用,因为出于一些原因的考虑,这两个包并不兼容。即使用通道时请使用缓冲完成输入输出控制。该程序在Windows2000,J2SE1.4下,用telnet测试成功。


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/yw/7917198.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-11
下一篇2023-04-11

发表评论

登录后才能评论

评论列表(0条)

    保存