Cassadra & presto 集群部署
四台服务器部署cassandra集群 和 presto搜索引擎 及代码演示:
还有很多细节没有补充,有问题和疑问的地方 咋们一起探讨哇!
1.创建用户
使用root用户登录应用服务器,执行以下操作:
adduser cassandra
passwd cassandra
使用cassandra用户登录应用服务器验证是否安装python 2 和JDK 8(我使用jdk8版本是因为项目中cassandra配合presto搜索引擎使用,presto需要jdk8支持):
python:一般linux机器自带python,如需安装请自行安装
JDK:
以下操作都是在四台linux服务器下进行操作:
将jdk-8u92-linux-x64.tar.gz软件上传到root用户下的/usr/local目录下:
解压jdk-8u92-linux-x64.tar.gz软件:
tar -zxvf jdk-8u92-linux-x64.tar.gz
修改/etc/profile配置文件:
vi /etc/profile
在文件末尾新增下面内容:
#set java environment
JAVA_HOME=/usr/local/jdk1.8.0_92/
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/jre/lib:$JAVA_HOME/lib
export JAVA_HOME PATH CLASSPATH
使文件重新生效:
source /etc/profile
2.创建省级目录
mkdir product
mkdir -p /home/cassandra/install/`date +%C%y%m%d`/
将升级文件apache-cassandra-2.1.16-bin.tar.gz、presto-server-0.165.tar.gz、etc.tar.gz、presto-cli-0.165-executable.jar上传至对应环境的应用服务器升级目录:/home/cassandra/install/`date +%C%y%m%d`/
解压升级包:
cd /home/cassandra/install/`date +%C%y%m%d`/
tar -zxvf etc.tar.gz
简要概述机器用途(UAT环境):
*.*.178.131(cassandra种子节点)(presto的coordinator节点)
*.*.178.132(cassandra种子节点)(presto的worker节点
*.*.178.133(cassandra普通节点)(presto的worker节点)
*.*.178.134(cassandra普通节点)(presto的worker节点)
3.安装cassandra:
使用cassandra用户登录应用服务器(四台机器),执行如下操作:
cd /home/cassandra/install/`date +%C%y%m%d`/
tar -zvxf apache-cassandra-2.1.16-bin.tar.gz -C /home/cassandra/product/
cd /home/cassandra/product/apache-cassandra-2.1.16/conf
sed -i ‘s/seeds: “127.0.0.1”/seeds: “种子节点IP1,种子节点IP2″/g’ cassandra.yaml
(将种子节点IP1,种子节点IP2修改为对应环境种子节点IP地址,种子节点选择 稳定的服务器)
sed -i ‘s/listen_address: localhost/listen_address: 各服务器IP/g’ cassandra.yaml
(将localhost修改为各服务器IP地址)
sed -i ‘s/rpc_address: localhost/rpc_address: 个服务器IP/g’ cassandra.yaml
(将localhost修改为各服务器IP地址)
sed -i ‘s/authenticator: AllowAllAuthenticator/authenticator: PasswordAuthenticator/g’ cassandra.yaml
//这个内存很重要,cassandra预处理的数据都缓存在这里 最大建议值 不要超过16G
sed -i ‘s/#MAX_HEAP_SIZE=”4G”/MAX_HEAP_SIZE=”8G”/g’ cassandra-env.sh
sed -i ‘s/#HEAP_NEWSIZE=”800M”/HEAP_NEWSIZE=”1600M”/g’ cassandra-env.sh
cd /home/cassandra/product/apache-cassandra-2.1.16/bin
sed -i ‘s/DEFAULT_REQUEST_TIMEOUT_SECONDS = 10/DEFAULT_REQUEST_TIMEOUT_SECONDS = 3600/g’ cqlsh
启动服务器:
在服务器*.*.178.131和*.*.178.132,执行如下代码:
cd /home/cassandra/product/apache-cassandra-2.1.16/bin
./cassandra -p cassandra.pid
输出INFO 06:22:29 Node /机器IP state jump to NORMAL 表示成功启动。
再分别在服务器*.*.178.133和*.*.178.134上执行。
cd /home/cassandra/product/apache-cassandra-2.1.16/bin
./cassandra -p cassandra.pid
输出INFO 06:22:29 Node /机器IP state jump to NORMAL 表示成功启动,即可回车退出
四台cassandra服务器启动好后,查看集群状态 第一列 UN为正常状态:
cd /home/cassandra/product/apache-cassandra-2.1.16/bin
[cassandra@hadoop01 bin]$ ./nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
— Address Load Tokens Owns (effective) Host ID Rack
UN *.*.178.131 159.37 KB 256 64.1% 6056ee11-f932-4a7c-a13e-632e143e2cbf rack1
UN *.*.178.132 141.05 KB 256 66.7% d385b5d5-638d-4d9a-ae76-30b15846864d rack1
UN *.*.178.133 159.72 KB 256 69.2% 262c33b7-bf6e-4dfc-b2d0-3a90cbe66622 rack1
UN *.*.178.134 159.72 KB 256 67.1% 262c33b7-bf6e-4dfc-b2d0-3a90cbe66622 rack1
4安装presto:
使用cassandra用户登录应用服务器(四台服务器) 执行如下操作:
cd /home/cassandra/install/`date +%C%y%m%d`
cp presto-cli-0.165-executable.jar /home/cassandra/product/
cd /home/cassandra/product
mv presto-cli-*.jar presto-cli
chmod 777 presto-cli
cd /home/cassandra/install/`date +%C%y%m%d`
tar -zvxf presto-server-0.165.tar.gz -C /home/cassandra/product/
cp -r etc /home/cassandra/product/presto-server-0.165/
mkdir /home/cassandra/product/presto-server-0.165/prestodata
①修改catalog属性:
cd /home/cassandra/product/presto-server-0.165/etc/catalog
sed -i ‘s/cassandra.contact-points=200.31.156.206,200.31.147.207/cassandra.contact-points=*.*.178.131,*.*.178.132/g’ cassandra.properties
(将*.*.178.131,*.*.178.132修改为对应环境种子节点IP地址)
②修改配置属性:
cd /home/cassandra/product/presto-server-0.165/etc
#presto的worker节点3台服务器(UAT为*.*.178.132,*.*.178.133和*.*.178.134),执行以下操作:
sed -i ‘s/coordinator=true/coordinator=false/g’ config.properties
sed -i ‘/node-scheduler.include-coordinator/’d config.properties
sed -i ‘/discovery-server.enabled/’d config.properties
#使用cassandra用户登录4台应用服务器(UAT环境为:*.*.178.131、*.*.178.132、*.*.178. 133、*.*.178.134)
sed -i ‘s/discovery.uri=http:\/\/200.31.147.10:8089/discovery.uri=http:\/\/*.*.178.131:8089/g’ config.properties
(将*.*.178.131修改为coordinator节点的地址,UAT环境为*.*.178.131)
sed -i ‘s/query.max-memory=30GB/query.max-memory=30GB/g’ config.properties
(将30G修改为物理内存的60% 物理内存:查看 free -g)
sed -i ‘s/query.max-memory-per-node=20GB/query.max-memory-per-node=20GB/g’ config.properties
(将20GB修改为物理内存的40% 物理内存:查看 free -g)
③ 修改节点信息:
sed -i ‘s/node.id=ffffffff-ffff-ffff-ffff-fffffffffff1/node.id=ffffffff-ffff-ffff-ffff-fffffffffff2/g’ node.properties
(本操作只在三台worker节点上运行, fffffffffff2,fffffffffff3 , fffffffffff4)
node.id:(coordinator节点为ffffffff-ffff-ffff-ffff-fffffffffff1;其它三个work节点分别为ffffffff-ffff-ffff-ffff-fffffffffff2、ffffffff-ffff-ffff-ffff-fffffffffff3、ffffffff-ffff-ffff-ffff-fffffffffff4)
④修改jvm信息:
sed -i ‘s/-Xmx40G/-Xmx20G/g’ jvm.config
(20G修改为物理内存的70% 物理内存:查看 free -g)
启动服务器:
使用cassandra用户登录应用服务器(UAT环境为:*.*.178.131、*.*.178.132、*.*.178. 133、*.*.178.134),执行如下操作:
cd /home/cassandra/product/presto-server-0.165/bin
./launcher start
5.后续即可登录presto 或者 Cassandra 使用sql查询
cassndra 连接DB:
cd /product/apache-cassandra-2.1.16/bin ./cqlsh –request-timeout=3600 -u cassandra -p cassandra *.*.178.131 (coordinator节点_IP)
presto 连接cassandra数据库:
cd /product ./presto-cli –server IP:8089 –catalog cassandra –schema dep_ntpi(dep_ntpi数据库实例名)
presto 监控界面: 172.17.193.13:8089 ,可以监控presto查询 耗时,内存等状态
6.代码样例 ,Java 连接 Cassandra数据库 ,需要下载使用 cassandra-driver-core-2.0.9.2.jar ,代码如下
a.连接数据库工具类
package com.cfets.util; import org.apache.log4j.Logger; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.HostDistance; import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.Session; public class CassandraHandle { private static final Logger logger = Logger.getLogger(CassandraHandle.class); private Cluster cluster; private Session session; private String[] hosts; private final static CassandraHandle cassandraHandle = new CassandraHandle(); public static CassandraHandle getInstance(){ return cassandraHandle; } /** * Initialize cassandra hosts. */ private CassandraHandle(){ hosts = new String[]{ReadDbConfigFile.getProperty("cassandra.Ip"), ReadDbConfigFile.getProperty("cassandra.Ip2"), ReadDbConfigFile.getProperty("cassandra.Ip3"), ReadDbConfigFile.getProperty("cassandra.Ip4")}; getConnectPool(hosts); } /** * Get cassandra ConnectionPool *@author:houyao *@date:2018年1月4日 下午3:44:47 *@Description: *@param hosts */ private void getConnectPool(String[] hosts){ try { QueryOptions qo = new QueryOptions(); //一致性级别 qo.setConsistencyLevel(ConsistencyLevel.QUORUM); PoolingOptions poolingOptions = new PoolingOptions(); poolingOptions.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, 32); poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, 2); poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, 4); cluster = Cluster.builder().addContactPoints(hosts) .withQueryOptions(qo) .withCredentials(ReadDbConfigFile.getProperty("cassandra.name"), ReadDbConfigFile.getProperty("cassandra.password")) .withPoolingOptions(poolingOptions) .build(); this.session = cluster.connect(); } catch (Exception e) { logger.error("CassandraHandle.getConnectPool() 连接异常: "+e.getMessage(),e); e.printStackTrace(); } } public Session getSession() { return session; } public void closeConn() { cluster.close(); } }
b. cassandra执行insert操作
package com.cfets.parser; import java.util.Date; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.log4j.Logger; import cn.com.cfets.data.MetaObject; import cn.com.cfets.data.coretransaction.fxoption.Quote; import com.cfets.imt.DEPService; import com.cfets.imt.common.AbstractParser; import com.cfets.util.CassandraHandle; import com.cfets.util.DateUtils; import com.cfets.util.FileUtils; import com.cfets.util.ReadConfigFile; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; public class DataNTPI0021C0001 extends AbstractParser { private static final Logger logger = Logger.getLogger(DataNTPI0021C0001.class); private ConcurrentLinkedQueue<Quote> datas = new ConcurrentLinkedQueue<Quote>(); private static int oddOreven = Integer.parseInt(ReadConfigFile.getProperty("timestamp")); public DataNTPI0021C0001() { new DealQThread().start(); } // 内部类 protected class DealQThread extends Thread { private ConcurrentLinkedQueue<Quote> queue; public void run() { Session session=null; BatchStatement bs=null; PreparedStatement statement=null; try { session = CassandraHandle.getInstance().getSession(); statement = (PreparedStatement) session.prepare(" insert into DEP_NTPI.fx_optn_prc_dtl (ord_num,prdct_cd, prd, impld_vltlty, qt_cd, qt_tm, qt_st, dt_cnfrm, cn_shrt_nm, en_shrt_nm, qtng_instn_cd, qt_instn_trdr_cd, ccy_pair_cd, trdng_md_cd, qt_instn_lgl_grp_nm, qt_instn_cn_full_nm, qt_dir, qt_instn_cfets_instn_cd, qt_instn_en_full_nm, qt_vrty, trdng_mthd_cd) values (?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"); while (true) { try { synchronized (DataNTPI0021C0001.class) { queue = datas; datas = new ConcurrentLinkedQueue<Quote>(); } if(session.isClosed()){ session = CassandraHandle.getInstance().getSession(); statement = (PreparedStatement) session.prepare(" insert into DEP_NTPI.fx_optn_prc_dtl (ord_num,prdct_cd, prd, impld_vltlty, qt_cd, qt_tm, qt_st, dt_cnfrm, cn_shrt_nm, en_shrt_nm, qtng_instn_cd, qt_instn_trdr_cd, ccy_pair_cd, trdng_md_cd, qt_instn_lgl_grp_nm, qt_instn_cn_full_nm, qt_dir, qt_instn_cfets_instn_cd, qt_instn_en_full_nm, qt_vrty, trdng_mthd_cd) values (?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"); } bs = new BatchStatement(); boolean flag=false;// if queue have message then flag = true ,next BatchStatement will be execute. int timeStamp = oddOreven;//ord_num(cassandraDB)= currentTimeMillis* + timeStamp, timeStamp decide the ord_num=odd number or even number long count;//memory currentTimeMillis*1000000 while (!queue.isEmpty()) { flag=true; Quote message = queue.poll(); logger.info("DATA_NTPI_0021_C_0001 JSON:"+FileUtils.toJSONString(message)); count = System.currentTimeMillis()*1000000; timeStamp = timeStamp+2;//-1 ++2 ,every time add 2=odd number bs.add(statement.bind( //count+timeStamp replace DB.getSeqence("SEQ_FX_OPTN_DPTH_QT_DATA.NEXTVAL"), //Attention: AA the First server=odd number ,the second server=even number //now is odd number count+timeStamp, //DB.getSeqence("SEQ_FX_OPTN_PRC_DTL.NEXTVAL"), message.isSetProductCode() ? message.getProductCode() : "", message.isSetPeriod() ? message.getPeriod() : "", message.isSetImpliedVolatility() ? message.getImpliedVolatility() + "" : "", message.isSetQuoteCode() ? message.getQuoteCode(): "", message.isSetQuoteTime() ? DateUtils.dateToString(message.getQuoteTime(),"yyyy-MM-dd HH:mm:ss") : "", message.isSetQuoteStatus() ? message.getQuoteStatus() : "", message.isSetDateConfirmed() ? DateUtils.dateToString(message.getDateConfirmed(),"yyyy/MM/dd") : "", message.isSetChineseShortName() ? message.getChineseShortName() : "", message.isSetEnglishShortName() ? message.getEnglishShortName() : "", message.isSetQuotingInstitutionCode() ? message.getQuotingInstitutionCode() : "", message.isSetQuoteInstitutionTraderCode() ? message.getQuoteInstitutionTraderCode() : "", message.isSetCurrencyPairCode() ? message.getCurrencyPairCode() : "", message.isSetTradingModeCode() ? message.getTradingModeCode() : "", message.isSetQuoteInstitutionLegalGroupName() ? message.getQuoteInstitutionLegalGroupName() : "", message.isSetQuoteInstitutionChineseFullName() ? message.getQuoteInstitutionChineseFullName() : "", message.isSetQuoteDirection() ? message.getQuoteDirection() : "", message.isSetQuoteInstitutionCfetsInstitutionCode() ? message.getQuoteInstitutionCfetsInstitutionCode(): "", message.isSetQuoteInstitutionEnglishFullName() ? message.getQuoteInstitutionEnglishFullName() : "", message.isSetQuoteVariety() ? message.getQuoteVariety() : "", message.isSetTradingMethodCode() ? message.getTradingMethodCode() : "")); if(bs.size()>=5000){ session.execute(bs); bs.clear(); timeStamp = oddOreven; } } if(flag){ session.execute(bs); logger.info("DATA_NTPI_0021_C_0001类型的数据插入成功!"); } } catch (Exception e1) { logger.error("DATA_NTPI_0021_C_0001类型的数据插入失败!"+e1); e1.printStackTrace(); } try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } } catch (Exception e2) { logger.error("DATA_NTPI_0021_C_0001获取cassandra连接失败:"+e2.getMessage(), e2); }finally{ try{ session.close(); }catch(Exception ex){ logger.error("DATA_NTPI_0021_C_0001关闭cassandra的session异常:"+ex); } }
使用presto查询需要下载 presto-jdbc-0.100.jar
c.presto连接cassandra工具类
package com.cfets.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; public class PrestoUtils { static{ try { Class.forName("com.facebook.presto.jdbc.PrestoDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public static Connection getConnection(){ Connection conn = null; try { conn = DriverManager.getConnection(ReadDbConfigFile.getProperty("presto.url"),ReadDbConfigFile.getProperty("cassandra.name"),ReadDbConfigFile.getProperty("cassandra.password")); } catch (SQLException e) { e.printStackTrace(); } return conn; } public static void release(Connection conn, Statement ps, ResultSet rs){ if(rs != null){ try{ rs.close(); }catch(SQLException e){ e.printStackTrace(); } rs = null; } if(ps != null){ try{ ps.close(); }catch(SQLException e){ e.printStackTrace(); } ps = null; } if(conn != null){ try{ conn.close(); }catch(SQLException e){ e.printStackTrace(); } conn = null; } } }
4.presto查询demo
package com.cfets.market; import java.math.BigDecimal; import java.sql.Connection; import java.sql.Date; import java.sql.ResultSet; import java.sql.Statement; import java.text.SimpleDateFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.cfets.util.DB; import com.cfets.util.PrestoUtils; import com.cfets.vo.FxSptfwdswpMrktOrdrDpth; /** * 外汇即期/远期/掉期档位行情(ODM)数据 转存oracle库 * @author wangzl */ public class FxSptfwdswpMrktOrdrDpthOrcl { private final Logger logger = LoggerFactory.getLogger(FxSptfwdswpMrktOrdrDpthOrcl.class); /** * 查询cassandra库数据 * @param targetDate */ public void loadDataFromPrestoFile(String targetDate) { long start = System.currentTimeMillis(); Connection conn = null; Statement stmt = null; ResultSet rs = null; try { conn = PrestoUtils.getConnection(); stmt = conn.createStatement(); long num = 0; logger.info("DATA-TSS-0012 外汇即期/远期/掉期档位行情(ODM):开始执行 转存oracle库 操作!"); while(true){ String sql = "SELECT ORD_NUM,PRDCT_CD,DL_MKT_DT_TP,BID_RATE,ASK_RATE,PRD,MKT_DATA_UPD_TM,DT_CNFRM,CCY_PAIR_CD,TRDNG_MD_CD,CNTRCT_NM,TRDNG_MD,MKT_DPTH_INDCTR,CFETS_UNFD_CD,BID_QT_TM,ASK_QT_TM,BID_QT_AMNT,ASK_QT_AMNT FROM DEP_NTPI.FX_SWAP_MKT_ORDER_DEPTH WHERE ORD_NUM > "+num+" AND DT_CNFRM = '"+targetDate+"' LIMIT 1000000"; logger.info("DATA-TSS-0012 外汇即期/远期/掉期档位行情(ODM):开始查询 SQL:"+sql); rs = stmt.executeQuery(sql); if(rs.next()){ FxSptfwdswpMrktOrdrDpth fsmod = new FxSptfwdswpMrktOrdrDpth(); fsmod.setOrdNum(rs.getLong("ORD_NUM")); fsmod.setPrdctCd(rs.getString("PRDCT_CD")); fsmod.setDlMktDtTp(rs.getString("DL_MKT_DT_TP")); fsmod.setBidRate(rs.getString("BID_RATE")); fsmod.setAskRate(rs.getString("ASK_RATE")); fsmod.setPrd(rs.getString("PRD")); fsmod.setMktDataUpdTm(rs.getString("MKT_DATA_UPD_TM")); fsmod.setDtCnfrm(rs.getString("DT_CNFRM")); fsmod.setCcyPairCd(rs.getString("CCY_PAIR_CD")); fsmod.setTrdngMdCd(rs.getString("TRDNG_MD_CD")); fsmod.setCntrctNm(rs.getString("CNTRCT_NM")); fsmod.setTrdngMd(rs.getString("TRDNG_MD")); fsmod.setMktDpthIndctr(rs.getString("MKT_DPTH_INDCTR")); fsmod.setCfetsUnfdCd(rs.getString("CFETS_UNFD_CD")); fsmod.setBidQtTm(rs.getString("BID_QT_TM")); fsmod.setAskQtTm(rs.getString("ASK_QT_TM")); fsmod.setBidQtAmnt(rs.getString("BID_QT_AMNT")); fsmod.setAskQtAmnt(rs.getString("ASK_QT_AMNT")); long firstOrdNum = rs.getLong("ORD_NUM"); num = toInsertDB(fsmod,rs); if(num == 0){//防止出现死循环 num = firstOrdNum; } }else{ break; } } logger.info("DATA-TSS-0012 外汇即期/远期/掉期档位行情(ODM):转存oracle库正常结束! 花费时间:"+(System.currentTimeMillis() - start)+" ms!"); } catch (Exception e) { e.printStackTrace(); logger.error("DATA-TSS-0012 外汇即期/远期/掉期档位行情(ODM):转存oracle库 出现异常:"+e.getMessage(),e); } finally{ PrestoUtils.release(conn, stmt, rs); } }