大数据离线分析平台 JavaSDK数据收集引擎编写
JavaSDK设计规则
JavaSDK提供两个事件触发方法,分别为onChargeSuccess和onChargeRefund。我们在java sdk中通过一个单独的线程来发送线程数据,这样可以减少对业务系统的延时性。
SDK测试
启动集群上的hdfs+nginx+flume进程,通过模拟数据的发送然后将数据发送到nginx服务器中,查看最终是否在hdfs中有数据的写入。
命令:
start-dfs.sh: 启动hdfs命令
su root:切换用户
service nginx restart: 启动nginx进程
启动flume进程:
进入flume安装根目录,执行命令:
flume-ng agent –conf ./conf/ –conf-file ./conf/test2.conf –name agent &
工程目录结构
AnalyticsEngineSDK如下:
package com.kk.ae.sdk; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; /* * 分析引擎sdk java服务器数据收集 * */ public class AnalyticsEngineSDK { //日志记录对象 private static final Logger log=Logger.getGlobal(); //请求url的主体部分 public static final String accessUrl="http://hadoop-001:8090/kkImg.gif"; public static final String platformName="java_server"; public static final String sdkName="jdk"; private static final String version = "1"; /** * 触发订单支付成功事件,发送事件数据到服务器 * * @param orderId * 订单支付id * @param memberIdd * 订单支付会员id * @return 如果发送数据成功(加入到发送队列中),那么返回true;否则返回false(参数异常&添加到发送队列失败). * @throws InterruptedException */ public static boolean chargeSuccess(String orderId,String memberId) throws InterruptedException { if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) { Map<String, String> map=new HashMap<String,String>(); map.put("u_mid", memberId); map.put("oid", orderId); map.put("c_time", String.valueOf(System.currentTimeMillis())); map.put("ver", version); map.put("en", "e_cs"); map.put("p1", platformName); map.put("sdk", sdkName); //创建url String url= buildUrl(map); // 发送url&将url加入到队列 SendDataMonitor.addSendUrl(url); System.out.println(url); return true; } else { log.log(Level.WARNING, "订单id和会员id不能为空"); return false; } } /** * 触发订单退款事件,发送退款数据到服务器 * * @param orderId * 退款订单id * @param memberIdd * 退款会员id * @return 如果发送数据成功,返回true。否则返回false。 * @throws InterruptedException */ public static boolean chargeRefund(String orderId,String memberId) throws InterruptedException { if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) { Map<String, String> map=new HashMap<String,String>(); map.put("u_mid", memberId); map.put("oid", orderId); map.put("c_time", String.valueOf(System.currentTimeMillis())); map.put("ver", version); map.put("en", "e_cr"); map.put("p1", platformName); map.put("sdk", sdkName); //创建url String url= buildUrl(map); // 发送url&将url加入到队列 SendDataMonitor.addSendUrl(url); System.out.println(url); return true; } else { log.log(Level.WARNING, "订单id和会员id不能为空"); return false; } } private static String buildUrl(Map<String, String> map) { StringBuffer stringBuffer=new StringBuffer(); stringBuffer.append(accessUrl).append("?"); for(Map.Entry<String, String> entry:map.entrySet()) { if (entry.getKey()!=null&&!entry.getKey().isEmpty()&&entry.getValue()!=null&&!entry.getValue().isEmpty()) { { try { stringBuffer.append(entry.getKey().trim()).append("=").append(URLEncoder.encode(entry.getValue().trim(),"utf-8")).append("&"); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } return stringBuffer.substring(0, stringBuffer.length() - 1); } }
SendDataMonitor 如下:
package com.kk.ae.sdk; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.ProtocolException; import java.net.URL; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; /** * 发送url数据的监控者,用于启动一个单独的线程来发送数据 * * @author gerry * */ public class SendDataMonitor { //收集日志 public static final Logger log=Logger.getGlobal(); // 队列,用户存储发送url public static final BlockingQueue<String> queue=new LinkedBlockingQueue<String>(); //用于单例的一个类对象 private static SendDataMonitor monitor=null; private SendDataMonitor() { // 私有构造方法,进行单列模式的创建 } public static SendDataMonitor getMonitor() { if (monitor==null) { synchronized (SendDataMonitor.class) { if (monitor==null) { monitor=new SendDataMonitor(); Thread thread=new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub SendDataMonitor.monitor.run(); } }); thread.start(); } } } return monitor; } protected void run() { while (true) { try { String url=this.queue.take(); // 正式的发送url HttpRequestUtil.sendData(url); } catch (Throwable e) { log.log(Level.WARNING, "发送url异常", e); } } } public static void setMonitor(SendDataMonitor monitor) { SendDataMonitor.monitor = monitor; } /** * 添加一个url到队列中去 * * @param url * @throws InterruptedException */ public static void addSendUrl(String url) throws InterruptedException { getMonitor().queue.put(url); } /** * 内部类,用户发送数据的http工具类 * * @author gerry * */ public static class HttpRequestUtil{ /** * 具体发送url的方法 * * @param url * @throws IOException */ public static void sendData(String url) throws IOException { HttpURLConnection con=null; BufferedReader bf=null; try { URL obj=new URL(url); con=(HttpURLConnection) obj.openConnection(); // 设置连接参数 con.setConnectTimeout(5000);//连接过期时间 con.setReadTimeout(5000);//读取数据过期时间 con.setRequestMethod("GET");//设置请求类型为get System.out.println("发送url:" + url); // 发送连接请求 bf=new BufferedReader(new InputStreamReader(con.getInputStream())); } finally { try { if (bf!=null) { bf.close(); } } catch (Throwable e) { // TODO: handle exception } try { con.disconnect(); } catch (Throwable e) { // TODO: handle exception } } } } }
测试类:
package com.kk.ae.sdk; public class Test { public static void main(String[] args) { try { AnalyticsEngineSDK.chargeSuccess("order3516", "0958"); AnalyticsEngineSDK.chargeRefund("kk3", "9009"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }