Java实现轻量级IM(服务端)

October 11, 2018 4 min read Author: Yu

本文的目的旨在用尽量少的代码封装一个Socket的C/S架构,并实现简单应用。

运行环境:服务端:ubuntu16.04.1+jre10 客户端1:Windows10+jre10 客户端2:Android8.0

一个S端对应多个C端,每个C端的连接由一个连接对象单元管理,然后由一个管理类来协调这些连接单元,连接单元作为管理类的内部类。管理类的结构如下所示:

public class ConnectServer extends Thread {
List<ConnectUnit> connect_list;
private class ConnectUnit extends Thread {
}
}

管理类一般在程序中作为附属功能(主线程中还需要做其他操作),为了不阻塞主线程,管理类继承Thread新开线程执行,每个连接单元需要单独连接一个客户端,所以也继承Thread在线程中实现。服务器开启Socket后,每连接一个客户端就初始化一个ConnectUnit对象然后加入list表中,连接成功后,每个ConnectUnit都有收发数据的能力,但是接收到数据后递交给上层ConnectServer处理,而发送数据的方法自己不调用,由ConnectServer中的方法统一调用。

首先来完善内部类ConnectUnit,一个简单的Socket的服务端主要用到ServerSocket类和Socket类,每个Socket对应一个具体的连接,每个ServerSocket对应一个服务主机,所以每个ConnectUnit中都应有个Socket来作为当前的连接,ServerSocket则放在ConnectServer中。在ServerSocket初始化完毕后,调用ServerSocket的accept方法来获取连接,该方法会阻塞当前线程,直到连接被建立(有客户端连接),并返回一个Socket对象,我们需要初始化一个新的ConnectUnit对象,将这个Socket对象保存在ConnectUnit中,并用此初始化相关的IO流,ConnectUnit的构造方法如下所示:

public ConnectUnit(Socket socket) {
this.socket = socket;
try {
in = new ObjectInputStream(socket.getInputStream());
out = new ObjectOutputStream(socket.getOutputStream());
} catch (IOException e) {
System.out.println("ObjectIOStream initialize failed.");
}
}

当然,ConnectUnit中也要声明相应的对象

ObjectInputStream in;
ObjectOutputStream out;
Socket socket;

这里有个小坑,一般基本的IO流都是用DataInputStream等,这里为了方便直接传输封装好的对象,所以采用ObjectInputStream和ObjectOutputStream,但是和前者不同的是ObjectIO流的构造方法是阻塞方法,ObjectIntputStream的构造方法会一直阻塞,直到从对应的连接的output流中获取到发送来的header,构造方法才会执行完毕;同理,ObjectOutputStream的构造方法也会阻塞,直到成功向对方发送一个Header。如果不额外开启线程的话,解决此问题最快的方法是S端初始化IO流的顺序和C端相反,否则两端将一直阻塞。

所有的连接单元对信息的处理方式均为被动接收,主动发送,所以将接收数据的相关代码写在run方法中:

@Override
public void run() {
try {
while (true) {
broadcast(in.readObject(), this);
}
} catch (ClassNotFoundException | IOException e) {
remove(this);
System.out.println("device lost connect.");
}
}

在while循环中便是循环接收,然后调用broadcast方法进行处理,这里暂且不管broadcast中的实现,在catch中调用了ConnectServer中的remove方法来移除当前连接,这里也先不管内部实现。在连接单元的线程开启后,进行持续的信息接收处理,若要主动发送,还需要完成一个发送方法,发送方法很简单,传入指定的对象,并开启线程输出:

public void sendData(Object messpkg) {
new Thread(() -> {
try {
out.writeObject(messpkg);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}

除此之外,ConnectUnit还有一个closegetIP方法,用于关闭socket以及IO流和获取客户端IP:

public void close() {
try {
socket.close();
in.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public String getIP() {
return socket.getInetAddress().toString();
}

以上就是ConnectUnit 中的核心内容,还有一些扩展内容这里先暂且不表。ConnectUnit主要负责收发数据,以及为上层ConnectServer提供可调用的方法,主要的连接处理以及内容分发都写在上层的ConnectServer中,同时ConnectServer负责服务端的运行,所以ConnectServer至关重要,相关的属性以及构造方法如下所示:

public class ConnectServer extends Thread {
private int port;
private ServerSocket server;
private List<ConnectUnit> connect_list;
public ConnectServer(int port) {
try {
this.port = port;
this.server = new ServerSocket(this.port);
connect_list = new ArrayList<>();
} catch (IOException e) {
e.printStackTrace();
}
}
}

关键属性有负责开启Socket的端口号以及ServerSocket对象,管理连接单元的List对象,ConnectServer的结构如下(忽略ConnectUnit):

public class ConnectServer extends Thread {
private int port;
private ServerSocket server;
private List<ConnectUnit> connect_list;
public ConnectServer(int port) {
try {
this.port = port;
this.server = new ServerSocket(this.port);
connect_list = new ArrayList<>();
} catch (IOException e) {
e.printStackTrace();
}
}
}

以上是ConnectServer和ConnectUnit的基本属性,下面是实现整个架构的方法,我们先从ConnectServer的开启连接方法说起。在构造函数成功执行后,我们就可以调用ServerSocket的accept方法等待连接,在成功连接后,用得到的socket初始化一个ConnectUnit对象,然后添加进List,我们为该方法起名为addConnect:

private void addConnect() {
try {
System.out.println("waiting for connect..");
Socket socket = server.accept();
ConnectUnit connect = new ConnectUnit(socket);
connect.start();
connect_list.add(connect);
System.out.println("connect success,waiting for data,connect count:" + connect_list.size());
} catch (IOException e) {
e.printStackTrace();
}
}

以上为建立一个新连接的方法,为了方便测试,将该方法用死循环包裹并放入run方法中,来不停的接受新的连接请求。在ConnectServer中,还写了之前提到的broadcast方法和remove方法,boardcast方法用于向list中的所有客户端广播一条信息,remove方法用于移除list中的某个连接单元,broadcast方法如下所示:

public void broadcast(Object o) {
for (ConnectUnit c : connect_list) {
c.sendData(o);
System.out.println("sending data to " + c.getIP());
}
}
public void broadcast(Object o, ConnectUnit self) {
for (ConnectUnit c : connect_list) {
if (!c.equals(self)) {
c.sendData(o);
System.out.println("sending data to " + c.getIP());
}
}
}

broadcast的内部遍历所有连接单元,调用它们的sendData方法向连接对象发送数据,该方法有一个重载方法,该重载方法是为了在连接单元中调用,例如一个客户端想要向同服务端下的其他所有客户端广播一条信息,调用第一个方法的话会向自己也发送一份信息,这显然是不必要的。

remove方法的实现较为简单,调用ConnectUnit中的close方法,然后在list中移除该对象。remove方法在ConnectUnit的run方法中的接收数据的抛出异常的catch块中执行,在连接中断后,remove方法被调用,该ConnectUnit对象被回收。

整个服务端的完整代码如下:

package server.myconnect;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class ConnectServer extends Thread {
private int port;
private ServerSocket server;
private List<ConnectUnit> connect_list;
public ConnectServer(int port) {
try {
this.port = port;
this.server = new ServerSocket(this.port);
connect_list = new ArrayList<>();
} catch (IOException e) {
e.printStackTrace();
}
}
public void broadcast(Object o, ConnectUnit self) {
for (ConnectUnit c : connect_list) {
if (!c.equals(self)) {
c.sendData(o);
System.out.println("sending data to " + c.getIP());
}
}
}
private void addConnect() {
try {
System.out.println("waiting for connect..");
Socket socket = server.accept();
ConnectUnit connect = new ConnectUnit(socket);
connect.start();
connect_list.add(connect);
System.out.println("connect success,waiting for data,connect count:" + connect_list.size());
} catch (IOException e) {
e.printStackTrace();
}
}
public void remove(ConnectUnit cu) {
cu.close();
connect_list.remove(cu);
}
@Override
public void run() {
while (true)
addConnect();
}
//内部私有类
private class ConnectUnit extends Thread {
ObjectInputStream in;
ObjectOutputStream out;
Socket socket;
public ConnectUnit(Socket socket) {
this.socket = socket;
try {
in = new ObjectInputStream(socket.getInputStream());
out = new ObjectOutputStream(socket.getOutputStream());
} catch (IOException e) {
System.out.println("201:ObjectIOStream initialize failed.");
}
}
public void close() {
try {
socket.close();
in.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
while (true) {
broadcast(in.readObject(), this);
}
} catch (ClassNotFoundException | IOException e) {
remove(this);
System.out.println("device lost connect.");
}
}
public void sendData(Object messpkg) {
new Thread(() -> {
try {
out.writeObject(messpkg);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
public String getIP() {
return socket.getInetAddress().toString();
}
}
}