消费阿里云日志服务SLS
此文档只关心消费接入,不关心日志接入,只关心消费如何接入,可直接跳转到【sdk消费接入]
SLS简介
- 日志服务:
- 日志服务(Log Service,简称 LOG)是针对日志类数据的一站式服务,在阿里巴巴集团经历大量大数据场景锤炼而成。您无需开发就能快捷完成日志数据采集、消费、投递以及查询分析等功能,提升运维、运营效率,建立 DT 时代海量日志处理能力
- 功能
- 实时采集与消费(LogHub)
- 投递数仓(LogShipper)
- 查询与实时分析(Search/Analytics)
接入消费流程
账号问题
- 如果消费自己的日志,直接使用自己阿里云账号的key
- 如果消费他人提供的日志,需要别人创建的子账号或者账号(推荐子账号,无安全问题)中的key,使用自己账号无法连接通
接入点EndPoint
- 公有云(弹外)
- 公有云(弹外)入口参见列表
消费接入(java)
概念
对象 | 明细 | |
---|---|---|
Log | 日志、日志组表示等基本概念 | |
Project | 项目 | |
Config | 配置 | |
LogStore | 日志库 | |
Index | 索引 | |
Shard | 分区 | |
ConsumerGroup | 消费组 |
配置
就如同使用 API 和日志服务服务端交互一样,使用 SDK 也需要指定一些基本配置。目前,所有语言的 SDK 都定义了一个 Client 类作为入口类,这些基本配置信息在该入口类的构造时指定。
具体包括如下几项:
- 服务入口(Endpoint):确认 Client 需要访问的服务入口
- 当使用 SDK 时,首先需要明确访问的日志服务 Project 所在 Region(如“华东 1 (杭州)”、“华北 1 (青岛)”等),然后选择与其匹配的日志服务入口初始化 Client。该服务入口与 API 中的 服务入口 定义一致
- 当选择 Client 的 Endpoint 时,必须要保证您需要访问的 Project 的 Region 和 Endpoint 对应的 Region 一致,否则 SDK 将无法访问您指定的 Project
- 由于 Client 实例只能在构造时指定该服务入口,如果需要访问不同 Region 里的 Project,则需要用不同的 Endpoint 构建不同的 Client 实例
- 目前,所有 API 的服务入口仅支持 HTTP 协议。
- 如果在阿里云 ECS 虚拟机内使用 SDK,您还可以使用内网 Endpoint 避免公网带宽开销,具体请参考 服务入口
- 阿里云访问秘钥(AccessKeyId/AccessKeySecret):指定 Client 访问日志服务时使用的访问秘钥
skd消费接入
原始接入
- 参见参考文档,要消费日志服务中的数据,请尽量不要直接使用SDK的拉数据接口,我们提供了一个高级消费库消费组消费,该库屏蔽了日志服务的实现细节,并且提供了负载均衡、按序消费等高级功能
消费组接入
- 同一个消费组下面的消费者名称必须不同,否则相同的消费者会同时消费logstore同份数据,造成数据重复
- 协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心
- 消费组(ConsumerGroup)
- 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据
- 消费组(Consumer)
- 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同
- shared消费组、消费组关系
- 一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者
- 每个shard只会分配到一个消费者
- 一个消费者可以同时拥有多个shard
- 新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明
- 一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者
-
maven
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.15</version>
</dependency>
阿里云client依赖log4j,如果项目中使用的logback,需要增加转换log4j到logback的转换
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.25</version>
</dependency>
java文件
- main
public class Main {
// 日志服务域名,根据实际情况填写
private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
// 日志服务项目名称,根据实际情况填写
private static String sProject = "ali-cn-hangzhou-sls-admin";
// 日志库名称,根据实际情况填写
private static String sLogstore = "sls_operation_log";
// 消费组名称,根据实际情况填写
private static String sConsumerGroup = "consumerGroupX";
// 消费数据的ak,根据实际情况填写
private static String sAccessKeyId = "";
private static String sAccessKey = "";
public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
{
// 第二个参数是消费者名称,同一个消费组下面的消费者名称必须不同,可以使用相同的消费组名称,不同的消费者名称在多台机器上启动多个进程,来均衡消费一个Logstore,这个时候消费者名称可以使用机器ip来区分。第9个参数(maxFetchLogGroupSize)是每次从服务端获取的LogGroup数目,使用默认值即可,如有调整请注意取值范围(0,1000]
LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
//Thread运行之后,Client Worker会自动运行,ClientWorker扩展了Runnable接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
//调用worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
worker.shutdown();
//ClientWorker运行过程中会生成多个异步的Task,Shutdown之后最好等待还在执行的Task安全退出,建议sleep 30s。
Thread.sleep(30 * 1000);
}
}
- SampleLogHubProcessor
public class SampleLogHubProcessor implements ILogHubProcessor
{
private int mShardId;
// 记录上次持久化 check point 的时间
private long mLastCheckTime = 0;
public void initialize(int shardId)
{
mShardId = shardId;
}
// 消费数据的主逻辑,这里面的所有异常都需要捕获,不能抛出去。
public String process(List<LogGroupData> logGroups,
ILogHubCheckPointTracker checkPointTracker)
{
// 这里简单的将获取到的数据打印出来
for(LogGroupData logGroup: logGroups){
FastLogGroup flg = logGroup.GetFastLogGroup();
System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
System.out.println("Tags");
for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
FastLogTag logtag = flg.getLogTags(tagIdx);
System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
}
for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
FastLog log = flg.getLogs(lIdx);
System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
FastLogContent content = log.getContents(cIdx);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// 每隔 30 秒,写一次 check point 到服务端,如果 30 秒内,worker crash,
// 新启动的 worker 会从上一个 checkpoint 其消费数据,有可能有少量的重复数据
if (curTime - mLastCheckTime > 30 * 1000)
{
try
{
//参数true表示立即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,后台默认隔60s会将checkpoint刷新到服务端。
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
mLastCheckTime = curTime;
}
return null;
}
// 当 worker 退出的时候,会调用该函数,用户可以在此处做些清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker)
{
//将消费断点保存到服务端。
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
{
public ILogHubProcessor generatorProcessor()
{
// 生成一个消费实例
return new SampleLogHubProcessor();
}
}
- 上述代码,工厂类可以用lambda替换
- client继承Runnable,必须以thread方式启动
- client原理:是启动线程,底层定时发送心跳给服务端,拿到要消费的必要信息,异步提交http请求任务(线程池),请求处理数据。所以调用client.shutdown,方法并不能立马把所有任务关闭,最好有个时间差,同时client中运行线程标记是否关闭的变量不是线程安全的,所以关闭的时候,依然有可能提交请求任务处理
错误处理
- SDK 可能出现的异常错误可以分成如下几类:
- 由日志服务端返回的错误。这类错误由日志服务端返回并由 SDK 处理。关于这类错误的详细细节可以参考日志服务 API 的通用错误码和各个 API 接口的具体说明。
- 由 SDK 在向服务端发出请求时出现的网络错误。这类错误包括网络连接不通,服务端返回超时等。日志服务内部并未对此做任何重试逻辑,所以,您在使用 SDK 时需要自己定义相应的处理逻辑(重试请求或者直接报错等)
- 由 SDK 自身产生的、与平台及语言相关的错误,如内存溢出等。
- 目前,各个语言 SDK 的实现都采取抛出异常的方式处理错误。具体原则如下:
- 由如上第一或者第二类错误将会被 SDK 处理并包装在统一的 LogException 类抛出给用户处理
- 由如上第三类错误不会被 SDK 处理,而是直接抛出平台及语言的 Native Exception 类给用户处理
- API错误重试
- 在ILogHubProcessor的process方法中,方法返回空表示正常处理数据, 如果需要回滚到上个check point的点进行重试的话,可以return checkPointTracker.getCheckpoint(),但是这里有可能会造成重复消费
- 自己增加重试策略,避免重复消费