flume自定义反序列化器deserializer
需求背景:
在利用flume进行日志收集的时候,错误信息会将堆栈多行打印,需要将多行信息合并成一行,包装成一个event进行传输。
解决思路:
解决上述需求可以通过自定义拦截器和自定义反序列化器来实现。网上关于自定义拦截器的资料比较多,但考虑到拦截器的定位和使用场景,拦截器不应用于多个event拆分组合,并若flume有并发处理的话,不能保证读取event是顺序的。查阅资料发现,通过自定义flume的反序列化器更加合理和安全。
实现步骤:
1:新建一个类,实现 EventDeserializer 接口
2: 重写 readEvent()方法或readEvents方法
3: 修改flume的配置文件,将sources.deserializer属性设置为自定义类
源码:
1:自定义反序列化器 —> MyLineDeserializer
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.xxx.flume.serializer; import com.google.common.collect.Lists; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.event.EventBuilder; import org.apache.flume.serialization.EventDeserializer; import org.apache.flume.serialization.ResettableInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; import java.util.List; /** * A deserializer that parses text lines from a file. */ @InterfaceAudience.Private @InterfaceStability.Evolving public class MyLineDeserializer implements EventDeserializer { private static final Logger logger = LoggerFactory.getLogger (MyLineDeserializer.class); private final ResettableInputStream in; private final Charset outputCharset; private final int maxLineLength; private volatile boolean isOpen; public static final String OUT_CHARSET_KEY = "outputCharset"; public static final String CHARSET_DFLT = "UTF-8"; public static final String MAXLINE_KEY = "maxLineLength"; public static final int MAXLINE_DFLT = 2048; private StringBuffer eventStringBuffer = new StringBuffer(); MyLineDeserializer(Context context, ResettableInputStream in) { this.in = in; this.outputCharset = Charset.forName( context.getString(OUT_CHARSET_KEY, CHARSET_DFLT)); this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT); this.isOpen = true; } /** * Reads a line from a file and returns an event * * @return Event containing parsed line * @throws IOException */ @Override public Event readEvent() throws IOException { ensureOpen(); String line = readLine(); Event event = null; while (line != null) { // start with 20 is one timestamp , event end if (line.trim().startsWith("20")) { event = EventBuilder.withBody(eventStringBuffer.toString(), outputCharset); eventStringBuffer.delete(0, eventStringBuffer.length()); } // add current line push to buffer if (line.trim().length() > 0) { if (eventStringBuffer.length() > 0) { eventStringBuffer.append(System.lineSeparator()).append(line); } else { eventStringBuffer.append(line); } } if (line.trim().startsWith("20")) { break; } line = readLine(); } if (line == null && eventStringBuffer.toString().length() > 0 ){ event = EventBuilder.withBody(eventStringBuffer.toString(), outputCharset); eventStringBuffer.delete(0, eventStringBuffer.length()); return event; } return event; } /** * Batch line read * * @param numEvents Maximum number of events to return. * @return List of events containing read lines * @throws IOException */ @Override public List<Event> readEvents(int numEvents) throws IOException { ensureOpen(); List<Event> events = Lists.newLinkedList(); for (int i = 0; i < numEvents; i++) { Event event = readEvent(); if (event != null) { events.add(event); } else { break; } } return events; } @Override public void mark() throws IOException { ensureOpen(); in.mark(); } @Override public void reset() throws IOException { ensureOpen(); in.reset(); } @Override public void close() throws IOException { if (isOpen) { reset(); in.close(); isOpen = false; } } private void ensureOpen() { if (!isOpen) { throw new IllegalStateException("Serializer has been closed"); } } // TODO: consider not returning a final character that is a high surrogate // when truncating private String readLine() throws IOException { StringBuilder sb = new StringBuilder(); int c; int readChars = 0; while ((c = in.readChar()) != -1) { readChars++; // FIXME: support \r\n if (c == '\n') { break; } sb.append((char) c); if (readChars >= maxLineLength) { logger.warn("Line length exceeds max ({}), truncating line!", maxLineLength); break; } } if (readChars > 0) { return sb.toString(); } else { return null; } } public static class Builder implements EventDeserializer.Builder { @Override public MyLineDeserializer build(Context context, ResettableInputStream in) { return new MyLineDeserializer(context, in); } } }
2: flume 配置文件
a1.sources.r1.deserializer = com.xxx.flume.serializer.MyLineDeserializer$Builder