基于TCP协议的项目架构之Socket流传输的实现
项目背景
某银行的影像平台由于使用时间长,服务器等配置原因,老影像系统满足不了现在日益增长的数据量的需求,所以急需要升级改造。传统的影像平台使用的是Oracle数据库和简单的架构来存储数据(视频、图片等),所以很难满足现在的业务需求,其中最主要的就是存储下载等速度的影响,综合考虑之后,我们给出了升级改造方案,基于Http协议的数据传输和基于TCP协议的数据传输,按照行方的要求需要用TCP协议,我们最终采用的是Socket网络传输的方案。
TCP协议介绍
TCP是一种面向连接的、可靠的、基于字节流的传输层通信协议,在简化的计算机网络OSI模型中,它完成第四层传输层所指定的功能,TCP层位于IP层之上,应用层之下的中间层,不同主机的应用层之间经常需要可靠的、像管道一样的连接,但IP层不需要提供这样的机制,而是提供不可靠的包交换。当应用层向TCP层发送用于网间传输的、用8位字节标示的数据流,TCP会把数据流分割为适当长度的报文段,之后TCP把数据包传给IP层,由它来通过网络将包传输给接收端的实体TCP层。TCP是因特网中的传输层协议,使用3次握手协议建立连接。
Socket
TCP通信是严格区分客户端与服务端的,在通信时,必须先由客户端去连接服务器才能实现通信,服务器端不可以主动连接客户端,并且服务器要事先启动,等待客户端的连接。
在JDK中提供了两个类用于实现TCP程序,一个是ServerSocket类,用于表示服务器,一个是Socket类,用于表示客户端。通信时,首先创建代表服务器端的ServerSocket对象,该对象相当于开启一个服务,并等待客户端的连接,然后创建代表客户端的Socket对象向服务器端发出连接请求,服务器端响应请求,两者建立连接,开始通信。
下面是我们通过Socket建立的模型,是多线程的,首先看服务端代码:
/** * socket服务端 * @author 我心自在 * */ public class Main { private static Log logger=null; //线程池 public static ExecutorService executor =null; /** * 静态块,初始化 */ static{ logger=LogFactory.getLog(SocketMain.class); //线程池 executor = Executors.newFixedThreadPool(Integer.parseInt(new PropertiesUtil(). getProperties("config.properties").getProperty("UserThreads"))); } /** * 主程序入口 * @param args */ public static void main(String[] args) { ServerSocket server = null; Socket socket = null; PropertiesUtil PropertiesUtil = new PropertiesUtil(); try { //启动Socket服务端 server=newServerSocket(Integer.parseInt(PropertiesUtil.getProperties("config.properties") .getProperty("port"))); while (true) { //多线程接收客户端请求 socket = server.accept(); if (socket != null) { executor.execute(new Controller(socket)); } } } catch (IOException e) { logger.error("Main IO异常:"+e.getMessage(),e); } finally { try { if (server != null) { server.close(); } if (socket != null) { socket.close(); } } catch (IOException e) { logger.error("Main流关闭异常:"+e.getMessage(),e); } } } }
下面是业务处理类,支持多线程,主要用来处理业务
public class SocketController implements Runnable { private Socket socket; private static Log logger=LogFactory.getLog(SocketController.class); public SocketController(Socket socket) { super(); this.socket = socket; } public void run() { //读取文件流 String requestMethod=null; Map<String,Object> getMethAndNumMap=null; Map<String,Object> jsonMap=null; BufferedInputStream bis =null; OutputStream ops = null; BufferedWriter bw = null; BufferedOutputStream bos = null; Document doc=null; String XMLString=null; try { bis= new BufferedInputStream (socket.getInputStream()); //获取输出流 ops = socket.getOutputStream(); bos = new BufferedOutputStream(ops); bw = new BufferedWriter(new OutputStreamWriter(ops)); byte[] fileinfo=new byte[256]; try { bis.read(fileinfo); } catch (IOException e1){ logger.error("流读取异常...."+e1.getMessage(),e1); } if(fileinfo!=null){ fileInfoString=new String(fileinfo).trim(); } jsonMap=JSONUtil.jsonToMap(fileInfoString); requestMethod=(String) jsonMap.get("requestMethod"); if (!(requestMethod==null||"".equals(requestMethod))) { switch (requestMethod){ case "login": Login.login(XMLString,bw,doc);//登录接口 break; case "XXXX": XXX.xxx(); break; default: //请求方法错误 } } } catch (UnknownHostException e) { logger.error("Socket未知端口:"+e.getMessage(),e); } catch (IOException e) { logger.error("Controller读流流异常:"+e.getMessage(),e); }finally{ try { if (bw!=null) { bw.flush(); bw.close(); } if(ops!=null){ ops.close(); } if(bos!=null){ bos.close(); } bis.close(); socket.close(); } catch (IOException e) { logger.error("socket关闭异常:"+e.getMessage(),e); } } }
这里只贴出了部分代码,做个参考,基本思想就是,通过输入流接到客户端发送过来的报文,然后进行解析,为什么统一用字节流接受呢,因为我们的文件上传分两部分,一部分是文件信息,一部分是文件流,所以为了方便,统一使用字节流接收,根据字节流中的请求接口方法,调用对应的接口方法,完成业务处理。因为客户端的报文有两种,一种是XML类型的报文,另外一种是json格式的报文,这里只贴出了部分json格式的代码。差的只是一个XML的解析,解析方式很多,就不再赘述。
下面是客户端代码,以登录为例:
public class LoginTest { public static void main(String[] args) { String xml = "<?xml version=\"1.0\" encoding=\"utf-8\"?>" + "<root>" + "<requestMethod>login</requestMethod>" + "<userinfo>" + "<loginname>test</loginname>" + "<passwd>dmd1dnZndXY=</passwd>" + "</userinfo>" + "</root>"; Socket socket=null; BufferedWriter bw=null; BufferedReader br=null; try { socket=new Socket(new ConfigUtil().getValue("ipAddress"), Integer.parseInt(new ConfigUtil().getValue("port"))); bw=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bw.write(xml); bw.newLine(); bw.flush(); br=new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println("服务器返回报文:"+br.readLine()); bw.close(); br.close(); socket.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
客户端代码非常简单,基本就是以流的形式发送报文,发送到客户端,完成请求,收到服务器响应后,关闭连接,完成一次请求。
以上只是一个简单的Socket通信模型,具体项目中涉及到很多内容,这里无法一一列举。大致思路我们就是通过Socket通信,获取客户端发送过来的报文,然后对报文进行解析,根据请求方法,调用不同的业务接口,处理不同的业务,我们的业务包括对影像资料的增删改查,文件根据不同的大小存在大数据集群中,小文件存入Hbase,大文件存入HDFS,然后根据不同的场景去查询删除或者更新。这里边涉及到的存索引,用到了Elasticsearch5.4,用来做高级检索查询。