- 浏览: 41338 次
- 性别:
- 来自: 大连
文章分类
其中主要想探讨的是一个监听连接的AcceptorReactor类,一个监听数据到达的SessionReactor类,一个服务器断主控类ServerManager,一个控制数据发送、接收、存储用户信息的Session类。
在服务器运行的时候,只有3个线程在跑,一个是main主线程,一个是监听连接的线程,一个是监听客户端数据到达的线程。当有客户端数据达时,会另开辟线程处理,处理结束后销毁该线程。
在使用的时候,需要自己写类继承ServerManager实现自己server的功能,需要写类继承Session实现自己的数据处理,在server.properties中配置服务器端口号、客户端数据编码、需要加载的Session类(也就是自己写的继承自Session的类)、发送接收数据时的数据分隔符。
其中的Reactor模式是参考网上的,具体网址已经忘记了。
AcceptorReactor类:
/** * */ package zys.net.tcp; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import zys.ThreadRunnable; /** * @author Administrator */ final class AcceptorReactor extends ThreadRunnable { private ServerManager serverManager; private Class<Session> sessionClass; private ServerSocketChannel serverSocketChannel; private Selector selector; public AcceptorReactor() { super(); } /** * */ public void run() { try { serverSocketChannel = ServerSocketChannel.open(); try { ServerSocket sSocket = serverSocketChannel.socket(); try { sSocket.bind(new InetSocketAddress(serverManager.getPort())); serverSocketChannel.configureBlocking(false); selector = Selector.open(); try { SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); serverManager.logInfo("Listener Reactor started."); querySelector(); } finally { selector.close(); } } finally { sSocket.close(); } } finally { serverSocketChannel.close(); } } catch (IOException e) { e.printStackTrace(); } } /** * @param aSelector * @throws IOException */ private void querySelector() throws IOException { ExecutorService pool = Executors.newFixedThreadPool(50); try { while (!Thread.interrupted()) { int n = selector.select(); if (n != 0) { Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) (it.next()); pool.execute((Runnable) key.attachment()); it.remove(); } } } } finally { pool.shutdown(); try { if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); } } } class Acceptor implements Runnable { // inner public void run() { try { SocketChannel c = serverSocketChannel.accept(); if (c != null) { c.socket().setSoLinger(true, 0); serverManager.logInfo("One Session conncted."); Session session = sessionClass.newInstance(); session.setManager(serverManager); session.setSocketChannel(c); session.setConnTime(new Date()); session.setConnIP(c.socket().getInetAddress().getHostAddress()); session.setConnStatus(Session.CONN_STATUS_CONNECT); serverManager.registerSession(session); } } catch (Exception ex) { // log } } } public ServerManager getServerManager() { return serverManager; } public void setServerManager(ServerManager aServerManager) { serverManager = aServerManager; } public Class<Session> getSessionClass() { return sessionClass; } public void setSessionClass(Class<Session> aSessionClass) { sessionClass = aSessionClass; } }
其中,在run方法中注册连接监听,在querySelector中捕获连接请求,在Acceptor的run中实现对监听的处理。
SessionReactor类:
/** * */ package zys.net.tcp; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import zys.ThreadRunnable; /** * @author Administrator */ final class SessionReactor extends ThreadRunnable { private ServerManager serverManager; private Selector selector; private ArrayList<Session> preparedSessions; /** * */ public SessionReactor() { super(); preparedSessions = new ArrayList<Session>(); } /** * @param aSession * @throws IOException */ public void registerSession(Session aSession) throws IOException { synchronized (preparedSessions) { preparedSessions.add(aSession); } selector.wakeup(); } public void clearPreparedSessions() { synchronized (preparedSessions) { preparedSessions.clear(); } selector.wakeup(); } public void stop() throws Exception { super.stop(); selector.wakeup(); } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ public void run() { try { selector = Selector.open(); try { serverManager.logInfo("Session Reactor started."); querySelector(); } finally { selector.close(); } } catch (Exception e) { e.printStackTrace(); } } private void querySelector() throws Exception { ExecutorService pool = Executors.newFixedThreadPool(200); try { int preparedCount = 0; while (!Thread.interrupted()) { synchronized (preparedSessions) { preparedCount = preparedSessions.size(); if (preparedCount > 0) { Iterator<Session> sessionIt = preparedSessions.iterator(); while (sessionIt.hasNext()) { Session session = sessionIt.next(); SocketChannel channel = session.getSocketChannel(); channel.configureBlocking(false); SelectionKey skReader = channel.register(selector, SelectionKey.OP_READ); skReader.attach(new Reader(session)); serverManager.logInfo("One Session registered."); } preparedSessions.clear(); } } int n = selector.select(); if (n != 0) { Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { // dispatch((SelectionKey) (it.next())); SelectionKey key = (SelectionKey) (it.next()); pool.execute((Runnable) key.attachment()); it.remove(); } } } Iterator<SelectionKey> it = selector.keys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) (it.next()); Reader r = (Reader) (key.attachment()); key.cancel(); r.getSession().distroy(); } } finally { pool.shutdown(); try { if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); } } } class Reader implements Runnable { // inner private Session session; public Reader(Session aSession) { session = aSession; } public void run() { if(!session.isActive()){ return; } SocketChannel channel = session.getSocketChannel(); int count; ByteBuffer buffer = null; try { synchronized(channel){ while(true){ buffer = ByteBuffer.allocate(10); count = channel.read(buffer); if (count > 0) { buffer = ByteBuffer.allocate(Integer.valueOf(new String(buffer.array(), 0, count, serverManager.getCharSet()))); count = channel.read(buffer); String sMsg = new String(buffer.array(), 0, count, serverManager.getCharSet()); serverManager.logDebugReceive(sMsg); session.onReceive(sMsg); }else{ if(count == -1){ if (session.isActive()) { session.distroy(); } } break; } } } } catch (Exception e) { e.printStackTrace(); serverManager.logError(this.getClass(), "Reader.run" , e.getMessage()); if (session.isActive()) { session.distroy(); } } } public Session getSession() { return session; } } public void setServerManager(ServerManager aServerManager) { serverManager = aServerManager; } }
其中,在registerSession中准备需要注册接收数据的Session对象,在querySelector中的synchronized (preparedSessions) {吧准备好的Session对象注册成接收数据监听,之后处理接收到数据的请求,Reader是处理接收到的数据的类。
所有代码已经上传,包zys.net.tcp中是核心类,包server中的是测试类,功能是客户端连接后每隔一秒向客户端发送字符串变量COST_PARAMS_STR的值,以实现客户端数据的事实刷新,发送格式是10位的代表数据长度的数字(比如0000000013),5位代表数据类别的数字(比如20000,代表每秒发的同步数据),紧接着是实际数据。
可以用telnet xxx.xxx.xxx.xxx 9999 测试。
所有核心代码以及测试代码已经上传,utf-8编码的。
比较担心的是当用户交互比较频繁的时候,服务器开辟线程过多,是否会引起服务器效率低下,不过经过测试在100个客户端的情况下,数据交互正常,丝毫看不出来延迟。比较失败的是zys.ThreadRunnable类,在某些地方引用了,这个类很脱裤子放屁。
欢迎批评,建议,请致zyongsheng83@163.com,谢谢!
- ServerLibSource.zip (10.5 KB)
- 下载次数: 98
发表评论
-
java - RMI一步一步学习
2007-07-31 21:34 582RMI,远程方法调用(Remote Method Invoca ... -
从追MM谈Java的23种设计模式
2007-07-31 21:37 558设计模式做为程序员的 ... -
jdk1.5新特性:
2007-08-01 20:48 10911. 范型(Generic) ArrayList< ... -
java中的String与常量池
2007-08-02 19:15 9321. 首先String不属于8种基本数据类型,String是一 ... -
java中的++运算符
2007-08-02 19:24 767int a = 0; a = a++; System.ou ... -
xml文件解析后不能删除
2007-08-03 21:52 887碰到的问题://XMLReader readerreader. ... -
变量初始化总结,防止以后忘记
2007-08-15 21:17 631在一个类中:1. 基本类型 static final - 编译 ...
相关推荐
一个基于JAVA的服务器框架,包含了服务器端和客户端的消息处理机制,有GUI管理界面
一个很好用的JAVA基于TCP和UDP的Socket通信框架,包括客户端和服务器,以及相应的回调方法
tcp服务器,线程池,RSA AES加密(可配置自定义加密或者不加密),Log4j记录日志
要查看完整的变更日志,请单击以下链接 - https://github.com/davidg95/JConn/releases ---该项目仍处于早期阶段。 请在 ... --- Java TCP 网络框架,使Java 网络应用程序的开发更容易。
一个基于棋牌、MMORPG游戏的分布式java游戏服务器,理论上可以无限水平扩展网关服,大厅服、游戏服达到人数承载。实现了集群注册中心,网关、登陆、后台服务器监控等通用服务器;封装了redis集群、mongodb等数据库...
Java整合springboot2.3+modbusTcp协议+netty高性能物联网服务源码 ...5、完全支持Modbus TCP 4种部署模式: TCP服务器master,TCP客户端slave,TCP服务器slave,TCP客户端master。 本人QQ412961810,有问题可以帮忙解答
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,...
.net 稳定 高效 易用 可同步 TCP 通信框架 使用平台: WinXP,WIN7,WIN8,WINCE,WINPHONE。 使用.net 2.0 框架。 主要功能介绍: 1、可以代替 Oracle,Mysql客户端 在不安装Oracle,MySql客户端的情况下访问, ...
Java自己写的简单web文件服务器,有详细注释,实现简单的文件服务器交互功能,实现TCP多线程连接和UDP传输文件,适合Java网络编程初学者参考
集成于Web API、Web服务器、MQTT服务器、Socket TCP服务器、OPC UA服务器、Modbus DTU服务器、Modbus TCP服务器于一体的服务器框架。 无需IIS支持,仅需简单几个步骤即可完成部署。 利用WEB API功能灵活创建...
java TCP udp 网络编程 网络框架 多线程 网络服务器设计
这是一个支持分布式和移植的Java游戏服务器框架,可用于开发棋牌,回合制等游戏。基于netty实现高性能通讯,支持tcp,http,websocket等协议。支持消息加解密,攻击拦截,黑白名单机制。封装了redis缓存,mysql...
⬤ 集成于Web API、Web服务器、MQTT服务器、Socket TCP服务器、OPC UA服务器、Modbus DTU服务器、Modbus TCP服务器于一体的服务器框架。 ⬤ 无需IIS支持,仅需简单几个步骤即可完成部署。 ⬤ 利用WEB API功能灵活...
通过 ioGame 你可以很容易的搭建出一个集群无中心节点、集群自动化、分布式的网络服务器。包体小、启动快、内存占用少、更加的节约、无需配置文件、提供了优雅的路由访问权限控制。可同时支持多种连接方式:WS、UDP...
jbus基于java netty的TCP透传服务器功能接收透传网关的TCP连接将网关作为一个设备,向mqtt服务器发布来自设备的数据消息通过向mqtt服务器订阅命令消息,将来自mqtt服务器的命令消息,转发给网关工具服务器状态监视...
它通过Java NIO通过各种传输(例如TCP/IP和UDP/IP)提供了一个抽象的事件驱动的异步API。 Apache MINA通常被称为: NIO框架库, 客户端服务器框架库, 网络套接字库 Apache MINA带有许多子项目: AsyncWeb:基于...
Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 ...
Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...
Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...