本文的目的旨在用尽量少的代码封装一个Socket的C/S架构,并实现简单应用。
运行环境:服务端:ubuntu16.04.1+jre10 客户端1:Windows10+jre10 客户端2:Android8.0
一个S端对应多个C端,每个C端的连接由一个连接对象单元管理,然后由一个管理类来协调这些连接单元,连接单元作为管理类的内部类。管理类的结构如下所示:
public class ConnectServer extends Thread { List<ConnectUnit> connect_list; private class ConnectUnit extends Thread { }}管理类一般在程序中作为附属功能(主线程中还需要做其他操作),为了不阻塞主线程,管理类继承Thread新开线程执行,每个连接单元需要单独连接一个客户端,所以也继承Thread在线程中实现。服务器开启Socket后,每连接一个客户端就初始化一个ConnectUnit对象然后加入list表中,连接成功后,每个ConnectUnit都有收发数据的能力,但是接收到数据后递交给上层ConnectServer处理,而发送数据的方法自己不调用,由ConnectServer中的方法统一调用。
首先来完善内部类ConnectUnit,一个简单的Socket的服务端主要用到ServerSocket类和Socket类,每个Socket对应一个具体的连接,每个ServerSocket对应一个服务主机,所以每个ConnectUnit中都应有个Socket来作为当前的连接,ServerSocket则放在ConnectServer中。在ServerSocket初始化完毕后,调用ServerSocket的accept方法来获取连接,该方法会阻塞当前线程,直到连接被建立(有客户端连接),并返回一个Socket对象,我们需要初始化一个新的ConnectUnit对象,将这个Socket对象保存在ConnectUnit中,并用此初始化相关的IO流,ConnectUnit的构造方法如下所示:
public ConnectUnit(Socket socket) { this.socket = socket; try { in = new ObjectInputStream(socket.getInputStream()); out = new ObjectOutputStream(socket.getOutputStream()); } catch (IOException e) { System.out.println("ObjectIOStream initialize failed."); }}当然,ConnectUnit中也要声明相应的对象
ObjectInputStream in;ObjectOutputStream out;Socket socket;这里有个小坑,一般基本的IO流都是用DataInputStream等,这里为了方便直接传输封装好的对象,所以采用ObjectInputStream和ObjectOutputStream,但是和前者不同的是ObjectIO流的构造方法是阻塞方法,ObjectIntputStream的构造方法会一直阻塞,直到从对应的连接的output流中获取到发送来的header,构造方法才会执行完毕;同理,ObjectOutputStream的构造方法也会阻塞,直到成功向对方发送一个Header。如果不额外开启线程的话,解决此问题最快的方法是S端初始化IO流的顺序和C端相反,否则两端将一直阻塞。
所有的连接单元对信息的处理方式均为被动接收,主动发送,所以将接收数据的相关代码写在run方法中:
@Overridepublic void run() { try { while (true) { broadcast(in.readObject(), this); } } catch (ClassNotFoundException | IOException e) { remove(this); System.out.println("device lost connect."); }}在while循环中便是循环接收,然后调用broadcast方法进行处理,这里暂且不管broadcast中的实现,在catch中调用了ConnectServer中的remove方法来移除当前连接,这里也先不管内部实现。在连接单元的线程开启后,进行持续的信息接收处理,若要主动发送,还需要完成一个发送方法,发送方法很简单,传入指定的对象,并开启线程输出:
public void sendData(Object messpkg) { new Thread(() -> { try { out.writeObject(messpkg); } catch (IOException e) { e.printStackTrace(); } }).start();}除此之外,ConnectUnit还有一个close和getIP方法,用于关闭socket以及IO流和获取客户端IP:
public void close() { try { socket.close(); in.close(); out.close(); } catch (IOException e) { e.printStackTrace(); }}
public String getIP() { return socket.getInetAddress().toString();}以上就是ConnectUnit 中的核心内容,还有一些扩展内容这里先暂且不表。ConnectUnit主要负责收发数据,以及为上层ConnectServer提供可调用的方法,主要的连接处理以及内容分发都写在上层的ConnectServer中,同时ConnectServer负责服务端的运行,所以ConnectServer至关重要,相关的属性以及构造方法如下所示:
public class ConnectServer extends Thread { private int port; private ServerSocket server; private List<ConnectUnit> connect_list;
public ConnectServer(int port) { try { this.port = port; this.server = new ServerSocket(this.port); connect_list = new ArrayList<>(); } catch (IOException e) { e.printStackTrace(); } }}关键属性有负责开启Socket的端口号以及ServerSocket对象,管理连接单元的List对象,ConnectServer的结构如下(忽略ConnectUnit):
public class ConnectServer extends Thread { private int port; private ServerSocket server; private List<ConnectUnit> connect_list;
public ConnectServer(int port) { try { this.port = port; this.server = new ServerSocket(this.port); connect_list = new ArrayList<>(); } catch (IOException e) { e.printStackTrace(); } }}以上是ConnectServer和ConnectUnit的基本属性,下面是实现整个架构的方法,我们先从ConnectServer的开启连接方法说起。在构造函数成功执行后,我们就可以调用ServerSocket的accept方法等待连接,在成功连接后,用得到的socket初始化一个ConnectUnit对象,然后添加进List,我们为该方法起名为addConnect:
private void addConnect() { try { System.out.println("waiting for connect.."); Socket socket = server.accept(); ConnectUnit connect = new ConnectUnit(socket); connect.start(); connect_list.add(connect); System.out.println("connect success,waiting for data,connect count:" + connect_list.size()); } catch (IOException e) { e.printStackTrace(); }}以上为建立一个新连接的方法,为了方便测试,将该方法用死循环包裹并放入run方法中,来不停的接受新的连接请求。在ConnectServer中,还写了之前提到的broadcast方法和remove方法,boardcast方法用于向list中的所有客户端广播一条信息,remove方法用于移除list中的某个连接单元,broadcast方法如下所示:
public void broadcast(Object o) { for (ConnectUnit c : connect_list) { c.sendData(o); System.out.println("sending data to " + c.getIP()); }}
public void broadcast(Object o, ConnectUnit self) { for (ConnectUnit c : connect_list) { if (!c.equals(self)) { c.sendData(o); System.out.println("sending data to " + c.getIP()); } }}broadcast的内部遍历所有连接单元,调用它们的sendData方法向连接对象发送数据,该方法有一个重载方法,该重载方法是为了在连接单元中调用,例如一个客户端想要向同服务端下的其他所有客户端广播一条信息,调用第一个方法的话会向自己也发送一份信息,这显然是不必要的。
remove方法的实现较为简单,调用ConnectUnit中的close方法,然后在list中移除该对象。remove方法在ConnectUnit的run方法中的接收数据的抛出异常的catch块中执行,在连接中断后,remove方法被调用,该ConnectUnit对象被回收。
整个服务端的完整代码如下:
package server.myconnect;
import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.net.ServerSocket;import java.net.Socket;import java.util.ArrayList;import java.util.List;
public class ConnectServer extends Thread { private int port; private ServerSocket server; private List<ConnectUnit> connect_list;
public ConnectServer(int port) { try { this.port = port; this.server = new ServerSocket(this.port); connect_list = new ArrayList<>(); } catch (IOException e) { e.printStackTrace(); } }
public void broadcast(Object o, ConnectUnit self) { for (ConnectUnit c : connect_list) { if (!c.equals(self)) { c.sendData(o); System.out.println("sending data to " + c.getIP()); } } }
private void addConnect() { try { System.out.println("waiting for connect.."); Socket socket = server.accept(); ConnectUnit connect = new ConnectUnit(socket); connect.start(); connect_list.add(connect); System.out.println("connect success,waiting for data,connect count:" + connect_list.size()); } catch (IOException e) { e.printStackTrace(); } }
public void remove(ConnectUnit cu) { cu.close(); connect_list.remove(cu); }
@Override public void run() { while (true) addConnect(); }
//内部私有类 private class ConnectUnit extends Thread {
ObjectInputStream in; ObjectOutputStream out; Socket socket;
public ConnectUnit(Socket socket) { this.socket = socket; try { in = new ObjectInputStream(socket.getInputStream()); out = new ObjectOutputStream(socket.getOutputStream()); } catch (IOException e) { System.out.println("201:ObjectIOStream initialize failed."); } }
public void close() { try { socket.close(); in.close(); out.close(); } catch (IOException e) { e.printStackTrace(); } }
@Override public void run() { try { while (true) { broadcast(in.readObject(), this); } } catch (ClassNotFoundException | IOException e) { remove(this); System.out.println("device lost connect."); } }
public void sendData(Object messpkg) { new Thread(() -> { try { out.writeObject(messpkg); } catch (IOException e) { e.printStackTrace(); } }).start(); }
public String getIP() { return socket.getInetAddress().toString(); } }
}