PCIE_DMA实例五:基于XILINX XDMA的PCIE高速采集卡
基于XDMA的PCIE高速采集卡,在KC705开发板上实现了2GByte/s的连续不间断采集,同时支持上位机发送文件或数据到FPGA,发送接口为fifo接口,发送速率约2GB/s,本采集卡还支持主机端通过PCIE配置FPGA端的用户寄存器,配置接口为local bus接口。整个采集系统简单易用、稳定高效。
PCIE_DMA实例五:基于XILINX XDMA的PCIE高速采集卡
一:前言
这一年关于PCIE高速采集卡的业务量激增,究其原因,发现百度“xilinx pcie dma”,出来的都是本人的博客。前期的博文主要以教程为主,教大家如何理解PCIE协议以及如何正确使用PCIE相关的IP核,因为涉及到商业道德,本人不能将公司自研的IP核以及相关工程应用放到网上。但为了满足大家对PCIE高速采集卡这块的业务需求,博主特地利用业余时间,使用XDMA这个xilinx官方IP,配合xilinx提供的linux驱动,在KC705开发板上实现了一套高速采集系统,该系统可对前端ADC产生的不大于2GB/s的连续或非连续数据进行实时采集,同时该采集卡具备数据发送功能,可以将用户文件或者内存中的数据写到FPGA的发送FIFO中,速率约为2GB/s,该采集卡具备上位机读写FPGA用户寄存器的功能,读写接口为local bus接口,方便易用。当然,如果您的高速采集卡需要大于2GB/s的采集速率,那博主只能拿出压箱底的另一套QDMA采集系统了,该系统在VC709上具备6.1GB/s的连续采集能力,要知道VC709的PCIE理论带宽都只有6.4GB/s,高达95%的传输效率真的很恐怖了,当然这套QDMA采集系统能有如此威力,主要拜FPGA大神马克杰所赐,马哥写的驱动充分发挥了系统的最大性能,吊打Xilinx的官方驱动。
二:前期准备
1、XILINX KC705开发板
2、pg195-pcie-dma.pdf
3、Vivado2018.2套件
4、X86主机一台,安装64位centos7.4 1708操作系统
5、XDMA linux驱动2018版本,GitHub上有下载。
三:系统框图
采集卡系统框图
从左到右从上到下依次介绍模块以及相应功能
1.data_gen
此模块模拟ADC产生的流数据,在本系统中,采样时钟250M,模拟AD数据位宽64位,故AD实时采样速率为2GB/s,可通过Send_En随时中止或继续数据产生。
2.axis data width converter
此模块将流数据64位位宽转换成128位位宽,时钟250M。
3.axis data fifo
此模块为流数据缓冲FIFO,深度不大,128足矣,真正的缓存要靠ddr完成。
4.YDMA
此模块为博主自己写的采集卡DMA控制器,该控制器的功能主要分四块:一,将收到的ST数据(axis接口)转换成MM数据(axi接口)写入DDR3;二,将需要发送的MM数据(axi接口)从DDR3中取出后转换成ST数据(axis接口)供用户使用;三,将XDMA输出的BYPASS接口转换成local_bus接口供用户读写寄存器使用;四,中断控制器,将写DDR和读DDR产生的中断送给XDMA,用户可设置包大小,中断包个数,中断超时时间。
5.user_reg_define
此模块为用户寄存器读写模块,读写接口为local bus接口,此用例中我们用它来配置Send_En。
6.axis_data_check
此模块用来校验上位机发下来的数据。
7.XDMA
此模块由上位机驱动控制,通过PCIE以SG_DMA的方式读写DDR3中的数据。
8.memory interface generator
此模块为DDR3控制器,使用AXI接口。
综上,整个采集卡包含两个方向的数据流向:FPGA>>PC:
data_gen->data_fifo->YDMA->DDR3->XDMA
PC>>FPGA:XDMA->DDR3->YDMA->data_check
当然FPGA逻辑部分最大的难点就在YDMA上,为了满足对任意包长、任意间隔的连续或非连续数据进行实时采集,需要产生大量的中断以及与之相对应的ddr缓存地址和缓存长度等中断信息,但XDMA驱动最大的bug恰恰出在中断上,为了规避XDMA的中断bug,又要提升整体的采集性能,需要对中断控制做精细设计。同时,对于那些突发的状况,比如采集数据突然中断的情况、急停急起的情况,都需要通过逻辑和软件的相互配合,才能跑出令人满意的采集效果。至于PC往FPGA发数据这个功能对于采集卡来说是锦上添花而已。因为有客户提出,需要将采集到的数据做处理,处理完后通过FPGA再发到另一个设备上,故我在YDMA上做了一个发数据的功能,用户接口也是大家最熟悉易用的axis(fifo)接口。因为YDMA包含一定技术含量,故该采集系统不能免费提供给大家,需要的用户可以联系我谈价格。
四:软件设计
XDMA的驱动是官方提供的,这里不做详细解读,总之XDMA驱动就是把PCIE DMA包成了多种字符设备:xdma_h2c,xdma_c2h,xdma_user,xdma_control,xdma_bypass,xdma_events
经过本人测试使用,我只推荐使用xdma_h2c,xdma_c2h,xdma_bypass,xdma_events这四个字符设备。xdma_h2c用来把数据从内存写到FPGA的DDR,xdma_c2h用来把数据从FPGA的DDR读到内存,xdma_bypass用来配置FPGA的用户寄存器,xdma_events用来读取用户中断。
下面我们来看看采集卡的测试程序我们是怎么写的,里面给出了详细的注释:
#define _BSD_SOURCE #define _XOPEN_SOURCE 500 #include <assert.h> #include <time.h> #include <fcntl.h> #include <getopt.h> #include <stdint.h> #include <string.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <memory.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/ioctl.h> #include <sys/time.h> #include <sys/resource.h> #include <sys/wait.h> #include <pthread.h> #include <sched.h> #include <semaphore.h> #include <sys/mman.h> #include <errno.h> //#include "dma_utils.c" #define FATAL do { fprintf(stderr, "Error at line %d, file %s \n", __LINE__, __FILE__); exit(1); } while(0) #define DEVICE_NAME_H2C "/dev/xdma0_h2c_0" #define DEVICE_NAME_C2H "/dev/xdma0_c2h_0" #define DEVICE_NAME_REG "/dev/xdma0_bypass" #define MAP_SIZE (1*1024*1024) #define MAP_MASK (MAP_SIZE - 1) #define RCV_EN_CMD 0 #define RX_DM_RST 1 struct timezone tz_time; struct timeval tv_time3; struct timeval tv_time4; pthread_t rcv_tid ; pthread_t print_sta_id; pthread_t event_thread; int work = 0 ; int lxcj =0; int int_rc; unsigned int lastData = 0; unsigned int rcvPktNum = 0; unsigned long rcvBytes = 0; unsigned long rcvBytes_l = 0; unsigned int errnum = 0 ; int c2h_fd ; int h2c_fd ; int control_fd; int interrupt_fd; void *control_base; static sem_t int_sem_rx; static sem_t int_sem_tx; char *device_c2h = DEVICE_NAME_C2H; char *device_h2c = DEVICE_NAME_H2C; char *device_reg = DEVICE_NAME_REG; static void write_control(void *base_addr,int offset,uint32_t val);//写用户寄存器 static uint32_t read_control(void *base_addr,int offset);//读用户寄存器 /*开中断*/ int open_event(char *devicename) { int fd; fd=open(devicename,O_RDWR|O_SYNC ); if(fd==-1) {printf("open event error\n"); return -1;} return fd; } /*获取用户中断*/ int read_event(int fd) { int val; read(fd,&val,4); return val; } /*打开字符设备*/ static int open_control(char *filename) { int fd; fd = open(filename, O_RDWR | O_SYNC); if(fd == -1) { printf("open control error\n"); return -1; } return fd; } /*获取设备对应的内存映射地址*/ static void *mmap_control(int fd,long mapsize) { void *vir_addr; vir_addr = mmap(0, mapsize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); return vir_addr; } /*写用户寄存器*/ static void write_control(void *base_addr,int offset,uint32_t val) { //uint32_t writeval = htoll(val); *((uint32_t *)(base_addr+offset)) = val; } /*读用户寄存器*/ static uint32_t read_control(void *base_addr,int offset) { uint32_t read_result = *((uint32_t *)(base_addr+offset)); //read_result = ltohl(read_result); return read_result; } /*打印进程,5秒打印一次统计信息,包含收到的包个数,错误包个数,以及当前的采集速率*/ void *printStatus() { unsigned int lostNum = 0 ; unsigned int allNum = 0 ; while( work ==1) { sleep(5); printf("rcvPkt[%8x], err[%8x] , rate[%d]MBps\n", rcvPktNum, errnum , (rcvBytes-rcvBytes_l)/5/1000000 ); rcvBytes_l = rcvBytes ; } } /*数据校验,因为模拟ADC数据是累加数,故收到的当前包的第一个数应该是上一次包的第一个数加上上次包的长度*/ void checkData(unsigned int *add, unsigned int len) { if(lastData != add[0] & lastData!=0 ) { errnum ++; if(errnum<20 )printf("l[%8x], n[%8x][%8x], [%8x], p[%x] , len[%d]\n", lastData , add[0], add[1], add[0] - lastData , (add[0] - lastData)/len , len) ; } rcvBytes = rcvBytes + len ; lastData = add[0] + len/8 ; } /*ADC连续数据采集处理进程*/ void *procPkt( ) { int i; int rxint_rc;//接收中断信息寄存器返回值 unsigned int * rxBuf;//接收数据存放的地址 int rxlen; //接收数据的长度 int c2h0_inbuf =0; c2h_fd= open(device_c2h, O_RDWR | O_NONBLOCK);//打开xdma_c2h字符设备 posix_memalign((void **)&rxBuf, 1024, 8*1024*1024);//开一个8M的内存空间用于暂存接收数据 printf("procAD up. \n" ); while( work ==1 ) { if(lxcj==1) { //assert(c2h_fd >= 0); sem_wait(&int_sem_rx); //等待用户接收中断 rxint_rc=read_control(control_base,0x10020);//从接收中断寄存器中获取接收中断相关的中断信息 int icnt; if((rxint_rc&0x80000000)>>31) icnt= rxint_rc &0x00ffffff;//接收中断寄存器bit31表示是否有接收中断,bit23-bit0表示有几个中断包 else continue; write_control(control_base,0x10020,icnt);//清中断寄存器,写入的内容为即将要处理的中断包个数 for(i=0;i<icnt;i++) //处理中断包 { int count = read_control(control_base,0x10018);//读接收中断状态FIFO,获取当前中断包的实际长度 off_t off = lseek(c2h_fd, c2h0_inbuf, SEEK_SET);//和FPGA协商好从DDR3的0地址开始存放接收数据,故软件从0地址开始取数据 rxlen = read(c2h_fd, rxBuf, count);//从DDR中取出数据放入rxbuffer write_control(control_base,0x10018,1); //清接收中断FIFO c2h0_inbuf = c2h0_inbuf + 0x400000 ;//本测试用例中设置的中断包最大长度为4M if(c2h0_inbuf==0x40000000) c2h0_inbuf = 0;//当DDR3偏移达到1G的时候重新归零 if(rxlen > 0) rcvPktNum ++ ;//统计接收包个数 checkData(rxBuf , rxlen) ;//校验接受到的包是否为连续数 } } } pthread_exit(0); } /*写数据进程,此用例中为发送任意大小文件*/ void h2c_process(char *filename) { h2c_fd= open(device_h2c, O_RDWR | O_NONBLOCK);//打开xdma_h2c字符设备 assert(h2c_fd>0); uint32_t send_len;//单次数据包发送长度,本测试用例中以4M为单位 uint32_t send_addr=0x0; int rc; int file_fd; uint64_t size;//发送文件的实际大小 uint64_t snd_cnt; struct stat fileStat; file_fd = open(filename, O_RDONLY);//打开发送文件 assert(file_fd >= 0); rc= stat(filename, &fileStat ); size = fileStat.st_size ; //获取发送文件的大小 snd_cnt =size; char *sendbuff = NULL; posix_memalign((void **)&sendbuff, 1024/*alignment*/, 8*1024*1024);//开一块8M的内存空间 gettimeofday(&tv_time3, &tz_time); while(size!=0) { sem_wait(&int_sem_tx);//等待发送中断信号量,该信号量初始值为9 if(size>0x400000) send_len = 0x400000; else send_len = size; off_t off_file = lseek(file_fd, send_addr, SEEK_SET); rc = read(file_fd, sendbuff, send_len);//将数据从文件中读到sendbuffer off_t off_h2c = lseek(h2c_fd, send_addr, SEEK_SET); //printf("send_len=%d\n,sendbuff=%x\n",send_len,sendbuff); rc = write(h2c_fd, sendbuff, send_len);//将sendbuffer中的数据发送到DDR3 write_control(control_base,0x11000,send_addr);//将DDR3数据缓存地址写入FPGA端的发送地址寄存器 write_control(control_base,0x11010,send_len); //将DDR3数据缓存长度写入FPGA端的发送长度寄存器 //printf("rc=%d\n",rc); assert(rc == send_len); size = size - send_len; send_addr = send_addr + send_len; if(send_addr==0x40000000) send_addr = 0;//发送数据的DDR3缓存偏移地址为1G时归零 } gettimeofday(&tv_time4, &tz_time); printf("write done\n"); printf(" 时间 %ld useconds\n", (tv_time4.tv_sec - tv_time3.tv_sec) * 1000000 + tv_time4.tv_usec - tv_time3.tv_usec); printf(" 数据量 %ld 字节\n", snd_cnt); printf(" 带宽 %ld MB/s\n", snd_cnt / ((tv_time4.tv_sec - tv_time3.tv_sec) * 1000000 + tv_time4.tv_usec - tv_time3.tv_usec)); if (file_fd >= 0) close(file_fd); free(sendbuff); } /*中断处理进程*/ void *event_process() { int i; int txint_rc; interrupt_fd = open_event("/dev/xdma0_events_0"); //打开用户中断 while(work==1) { read_event(interrupt_fd); //获取用户中断 int_rc=read_control(control_base,0x00000); //读总中断寄存器 switch(int_rc) { case 1: //接收中断 sem_post(&int_sem_rx); break; case 2: //发送中断 txint_rc=read_control(control_base,0x11020); //从发送中断寄存器中获取发送中断相关的中断信息 int txicnt; if((txint_rc&0x80000000)>>31) txicnt= txint_rc &0x00ffffff;//发送中断寄存器bit31表示是否有发送中断,bit23-bit0表示发出了几个中断包 else break; write_control(control_base,0x11020,txicnt);//清中断寄存器,写入的内容为即将要处理的中断包个数 for(i=0;i<txicnt;i++) sem_post(&int_sem_tx); //为每个发出的中断包释放一个信号量 break; default: break; } } pthread_exit(0); } int main(int argc, char *argv[]) { ssize_t rc; char inp ; unsigned int * rxBuf; posix_memalign((void **)&rxBuf, 1024, 1024*1024*1024); control_fd = open_control("/dev/xdma0_bypass");//打开bypass字符设备 control_base = mmap_control(control_fd,MAP_SIZE);//获取bypass映射的内存地址 //c2h_fd= open(device_c2h, O_RDWR | O_NONBLOCK); //h2c_fd= open(device_h2c, O_RDWR | O_NONBLOCK); sem_init(&int_sem_rx, 0, 0); sem_init(&int_sem_tx, 0, 9); work =1 ; pthread_create(&rcv_tid , NULL, procPkt, NULL); pthread_create(&print_sta_id, NULL, printStatus, NULL ); pthread_create(&event_thread, NULL, event_process, NULL); write_control(control_base,0x10028,0xFFFFFF08);//写接收中断控制寄存器,bit31-bit8为中断超时时间,bit7-bit0为多少个包产生一次中断 write_control(control_base,0x11028,0xFFFFFF00);//写发送中断控制寄存器,bit31-bit8为中断超时时间,bit7-bit0为多少个包产生一次中断 char *file_write = "/run/media/root/software/CentOS-7-x86_64-Everything-1708/CentOS-7-x86_64-Everything-1708.iso"; while(inp!=\'o\') { inp = getchar(); switch(inp) { case \'w\': h2c_process(file_write); break; case \'r\': write_control(control_base,0x10030,4);//复位接收DMA rc=read( c2h_fd, rxBuf, 1*1024); printf("rc=%x\n",rc); break; case \'s\': write_control(control_base,0x10030,4);//复位接收DMA lxcj=1; write_control(control_base,0x10038,1);//使能接收 break; case \'e\': write_control(control_base,0x10038,0);//停止接收 sleep(2); lxcj=0; break; case \'t\': write_control(control_base,0x30008,1);//使能模拟ADC数据发送 break; case \'p\': write_control(control_base,0x30008,0);//暂停模拟ADC数据发送 break; case \'o\': write_control(control_base,0x10030,1);//复位接收DMA write_control(control_base,0x11030,1);//复位发送DMA break; default: break; } } work =0 ; out: close(c2h_fd); close(h2c_fd); return rc; }
五:测试结果
本采集系统测试环境为X86主机,CPU为Intel 酷睿i7 8700K,FPGA选用xilinx公司的KC705开发板,操作系统为centos7.4 1708,内核版本3.10.0-693,博主最近会在windows上做一版测试程序,到时候分享给需要的朋友。
六:结束语
本博文展示的PCIE高速采集系统主要面向有这方面工程应用需求的朋友,不建议初学者作为学习使用。本人从事高速总线接口已七年有余,积累了大量总线相关的FPGA设计经验,主要涉及FC、rapidio、千兆、万兆以太网、lvds、mlvds、can、422、1553B。同时也可承接算法加速或者视频图像处理等相关项目。最后放上一段基于QDMA(非xilinx的官方IP)的PCIe高速采集卡在VC709上的测试结果,致敬前辈马哥!