【并发】4、文件锁,多线程队列拷贝文件,并读取的操作
最近有这样一个功能点,我实现了一个多线程的队列,生产线程ftp获取文件,然后扫描指定目录,获取文件基础信息,然后put进入队列,然后消费者通过这个信息解析文件进行入库,因为文件会比较多,所以到时候会起多个消费线程去解析数据,并且考虑到复用,不用的文件,消费消除要分化操作(策略模式),但是遇到一个问题,就是生产线程可能重复扫描到一个还未解析完成的文件
1。改进,新建一个consum目录,扫描到一个文件,就移动到consum目录,这样ftp操作就无脑向生产文件夹拉文件就行
2.还有个文件,就是如果消费线程如果解析不够快,也有可能导致文件被重复扫描到,那么为了避免,就需要扫到一个文件,就把文件移动到指定的目录下,等待消费
3.新文件,当生产队列不够快,消费队列空闲比较多了的时候,不同的文件操作,会导致线程锁死,因为生产线程文件还未写入,消费线程就开始读取,这个时候线程卡死,造成死锁
4.为了解决这个问题,我们再写文件的时候对文件上一个文件锁,读取文件的消费线程,等待生产线程写入完成之后再进行操作,这里有个demo来模拟这个线程锁
package io; import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; /** * @ProjectName: cutter-point * @Package: io * @ClassName: FileLockTest * @Author: xiaof * @Description: 测试多线程下的文件锁 * @Date: 2019/3/7 9:34 * @Version: 1.0 */ public class FileLockTest { //第一个线程不断吧文件从文件A拷贝到B,然后从B拷贝到A //第二个线程不断遍历B文件夹,并遍历B文件夹的数据,输出到控制台 private final static String baseDir1 = "C:\\Users\\xiaof\\Desktop\\1\\dir-t1"; private final static String baseDir2 = "C:\\Users\\xiaof\\Desktop\\1\\dir-t2"; @Test public void test1() { // String fileName = "多线程队列.png"; FileOutputStream fileOutputStream = null; FileInputStream fileInputStream = null; //反复吧文件拷贝到两个目录中 try { while(true) { System.out.println(Thread.currentThread() + "-休眠1s开始新一轮拷贝"); Thread.sleep(1000); //判断文件目录是否有文件 File dir1 = new File(baseDir1); String souDir = ""; String desDir = ""; if(dir1.listFiles().length > 0) { souDir = baseDir1; desDir = baseDir2; } else { //吧文件从2拷贝到1 souDir = baseDir2; desDir = baseDir1; } //吧文件从1拷贝到2 //1.遍历所有文件,依次上锁 File sourFileDir = new File(souDir); for(int i = 0; i < sourFileDir.listFiles().length; ++i) { File tmpFile = sourFileDir.listFiles()[i]; //输出 fileInputStream = new FileInputStream(tmpFile); fileOutputStream = new FileOutputStream(desDir + "/" + tmpFile.getName()); //对写的文件进行上锁,读的文件不需要上锁 FileChannel fileChannel = fileOutputStream.getChannel(); FileLock fileLock = null; //操作之前判断是否已经被锁住 while(true) { fileLock = fileChannel.tryLock(); if(fileLock != null) { //已上锁 break; } else { System.out.println(Thread.currentThread() + "-文件已被锁定,休眠1s"); Thread.sleep(1000); } } //拷贝数据 byte buf[] = new byte[1024]; int len = 0; while((len = fileInputStream.read(buf)) != -1) { fileOutputStream.write(buf, 0, len); } fileOutputStream.flush(); //最后删除源文件 fileLock.release();//解锁 fileChannel.close(); fileOutputStream.close(); fileInputStream.close(); fileInputStream = null; fileOutputStream = null; if(tmpFile.delete()) { System.out.println("删除源文件"); } } } } catch (Exception e) { e.printStackTrace(); } } @Test public void test2() { //读取文件并输出 FileOutputStream fileOutputStream = null; FileInputStream fileInputStream = null; try{ //判断文件是否存在 while (true) { Thread.sleep(1000); File dir1 = new File(baseDir1); String souDir = ""; String desDir = ""; souDir = baseDir1; desDir = baseDir2; // if(dir1.listFiles().length > 0) { // souDir = baseDir1; // desDir = baseDir2; // } else { // //吧文件从2拷贝到1 // souDir = baseDir2; // desDir = baseDir1; // } File sourFileDir = new File(souDir); for(int i = 0; i < sourFileDir.listFiles().length; ++i) { File tmpFile = sourFileDir.listFiles()[i]; //输出 fileInputStream = new FileInputStream(tmpFile); //拷贝数据 byte buf[] = new byte[1024]; int len = 0; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); while((len = fileInputStream.read(buf)) != -1) { byteArrayOutputStream.write(buf, 0, len); } System.out.println("读取目录:" + souDir); System.out.println(byteArrayOutputStream.toString()); //最后删除源文件 fileInputStream.close(); fileInputStream = null; } } } catch (Exception e) { e.printStackTrace(); } } }