python监控服务器应用日志,推送钉钉机器人,实时关注日志异常
生产环境多台服务器上部署了多个应用,日志出现报错时,无法及时反馈到开发人员。部署一个大型的运维监控应用,不但耗资源,而且配置也不简单。
简简单单写个python脚本来监控服务器日志就简单多了,废话不多说,直接上脚本。
主要逻辑:
1. 使用python的subprocess模块,执行shell命令,“tail -f” 来监听日志文件
2. 对输出的日志文件,逐行比对字符串,如果匹配到预设的字符串则开始记录,keywords中配置需要预设的异常字符串
3. 开始记录日志至数组中,可通过rows来预设记录多少条,达到规定数量后
4. 将日志内容通过http发送给钉钉服务器,钉钉机器人就会在群里通知我们啦
简单,高效,实时
#!/usr/bin/python # encoding=utf-8 # Filename: monitorLog.py import os import signal import subprocess import time import smtplib from email.mime.text import MIMEText from email.header import Header import requests import json import threading serverName="prod:192.168.100.20" #日志文件路径地址 logFiles =[ "/opt/app/mall/mall-admin/logs/mall-admin-0.0.1-SNAPSHOT.log", "/opt/app/mall/mall-portal/logs/mall-portal-0.0.1-SNAPSHOT.log" ]; #机器人回调地址 , 钉钉群里添加一个机器人就可以了 webhook = 'https://oapi.dingtalk.com/robot/send?access_token=38670255eb92cd3a8ce94732fd785dc753134fc1c4d69141f56e942a5d453e5b' #日志缓存数据 logs = [] #记录日志行数,从匹配行开始记录,到达数量就用推给钉钉机器人 rows = 2 # 是否开始记录日志标记位 recordFlags = [] # 检测的关键字数组,大小写敏感 keywords = [] keywords.append("threw exception") keywords.append("java.lang.NullPointerException") keywords.append("com.macro.mall.exception.ApiRRException") keywords.append("SocketTimeoutException") #消息内容,url地址 def dingtalk(msg,webhook): headers = {'Content-Type': 'application/json; charset=utf-8'} data = {'msgtype': 'text', 'text': {'content': msg}, 'at': {'atMobiles': [], 'isAtAll': False}} post_data = json.dumps(data) response = requests.post(webhook, headers=headers, data=post_data) return response.text # 检查行 def checkLine(line, index): curLog = logs[index] if recordFlags[index]==1: curLog.append(line) if len(curLog)>rows: content = "\n".join('%s' %id for id in curLog) print(content) dingtalk(content,webhook) recordFlags[index] = 0 logs[index]=[] else: for keyword in keywords: if keyword in line: recordFlags[index] = 1 curLog.append(serverName) curLog.append(logFiles[index]) curLog.append(line) break #日志文件一般是按天产生,则通过在程序中判断文件的产生日期与当前时间,更换监控的日志文件 #程序只是简单的示例一下,监控test1.log 10秒,转向监控test2.log def monitorLog(logFile, index): #print('监控的日志文件 是%s' % logFile) popen = subprocess.Popen('tail -f ' + logFile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) pid = popen.pid print('Popen.pid:' + str(pid)) while True: line = popen.stdout.readline().strip() # 判断内容是否为空 if line: #print line checkLine(str(line), index) if __name__ == '__main__': for index, logFile in enumerate(logFiles): logs.append([]) recordFlags.append(0) tt = threading.Thread(target=monitorLog,args=(logFile,index)) tt.start() print ("monitor:"+str(logFile))