基于go-cqhttp实现QQ机器人
本篇文章记录一下自己在编写QQ机器人的时候所遇到的一些问题和核心功能的实现。
QQ机器人RabbitBot采用python编写,由于是个人学习使用,故目前不会开源完整代码,只会放出核心代码供学习参考。
使用的go-cqhttp项目:https://github.com/Mrs4s/go-cqhttp
go-cqhttp是基于 Mirai 以及 MiraiGo 的 cqhttp golang 原生实现。
RabbitBot在读取、发送QQ信息时采用的是HTTP API和反向HTTP POST接口。
在这里也安利一波大佬同学Yang_99写的已经开源在github上的Yes酱,这么好的东西还不赶紧来star+fork吗
github项目地址:https://github.com/Yang9999999/Go-CQHTTP-YesBot
使用方法:
http://www.yang99.top/index.php/archives/18/
http://www.dtmao.cc/news_show_649533.shtml
初期配置
go-cqhttp的安装和完整配置可自行查看文档,这里描述一下关键部分。
配置文件:
{
uin: 0
password: 0
encrypt_password: false
password_encrypted: ""
enable_db: false
access_token: ""
relogin: {
enabled: true
relogin_delay: 3
max_relogin_times: 0
}
_rate_limit: {
enabled: false
frequency: 1
bucket_size: 1
}
ignore_invalid_cqcode: false
force_fragmented: true
fix_url: false
proxy_rewrite: ""
heartbeat_interval: -1
http_config: {
enabled: true
host: 0.0.0.0
port: 5700
timeout: 0
post_urls:
{
"127.0.0.1:5710":secret
}
}
ws_config: {
enabled: false
host: 0.0.0.0
port: 6700
}
ws_reverse_servers: [
{
enabled: false
reverse_url: ws://you_websocket_universal.server
reverse_api_url: ws://you_websocket_api.server
reverse_event_url: ws://you_websocket_event.server
reverse_reconnect_interval: 3000
}
]
post_message_format: string
use_sso_address: false
debug: false
log_level: ""
web_ui: null
}
其中uin为机器人QQ号
password为密码
enable_db为是否使用数据库,由于个人学习使用,这里并不开启数据库功能
heartbeat_interval为心跳间隔时间,默认开启,值小于0则关闭。
http_config进行http接口设置,post_urls设置上传接收端口
ws_config和ws_reverse_servers是Websocket的正向和反向接口,由于RabbitBot采用http接口,故这里也全部设置了false
RabbitBot开启后的显示结果:
信息发送和接收
RabbitBot利用python的socket库来进行数据的发送和接收,socket库的使用方法请自行学习。
5700为go-cqhttp默认端口,设置http post上报器端口为5710。
常用的HTTP API:
发送私聊信息:/send_private_msg
发送群聊信息:/send_group_msg
服务端接收信息配置:
ListenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ListenSocket.bind((\'127.0.0.1\', 5710))
ListenSocket.listen(100)
HttpResponseHeader = \'\'\'HTTP/1.1 200 OK
Content-Type: text/html
\'\'\'
#定位有效信息
def request_to_json(msg):
for i in range(len(msg)):
if msg[i]=="{" and msg[-1]=="}":
return json.loads(msg[i:])
return None
#需要循环执行,返回值为json格式
def rev_msg():# json or None
conn, Address = ListenSocket.accept()
Request = conn.recv(1024).decode(encoding=\'utf-8\')
#print(Request)
rev_json=request_to_json(Request)
#print(rev_json)
conn.sendall((HttpResponseHeader).encode(encoding=\'utf-8\'))
conn.close()
return rev_json
接收到的群聊信息格式:
{‘anonymous’: None, ‘font’: 0, ‘group_id’: ×××××, ‘message’: ‘爱你’, ‘message_id’: 1425567994, ‘message_seq’: 2170, ‘message_type’: ‘group’, ‘post_type’: ‘message’, ‘raw_message’: ‘爱你’, ‘self_id’: ×××××, ‘sender’: {‘age’: 0, ‘area’: ‘’, ‘card’: ‘’, ‘level’: ‘’, ‘nickname’: ‘七月’, ‘role’: ‘owner’, ‘sex’: ‘unknown’, ‘title’: ‘’, ‘user_id’: ×××××}, ‘sub_type’: ‘normal’, ‘time’: 1611721421, ‘user_id’: ×××××}
group_id为群号,user_id为发送者QQ号,message为接收到的信息,message_id为信息编号,随机生成,message_type为信息类型。
接收到的私聊信息格式:
{‘font’: 0, ‘message’: ‘[CQ:face,id=107]’, ‘message_id’: -730420846, ‘message_type’: ‘private’, ‘post_type’: ‘message’, ‘raw_message’: ‘[CQ:face,id=107]’, ‘self_id’: ×××××, ‘sender’: {‘age’: 0, ‘nickname’: ‘七月’, ‘sex’: ‘unknown’, ‘user_id’: ×××××}, ‘sub_type’: ‘friend’, ‘time’: 1611726456, ‘user_id’: ×××××}
user_id为通话者QQ号,message为接收到的信息,**’[CQ:face,id=107]’**为表情数据,message_id为信息编号,随机生成,message_type为信息类型。
客户端发送信息配置:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#从客户端发送给服务端
def client_to_conn():
label = get_message_type()
number = get_number()
msg = get_raw_message()
if flag == 0:
msg = txt_msg(get_raw_message())
if label == \'group\':
payload = "GET /send_group_msg?group_id=" + str(number) + "&message=" + msg + " HTTP/1.1\r\nHost: 127.0.0.1:5700\r\nConnection: close\r\n\r\n"
elif label == \'private\':
payload = "GET /send_private_msg?user_id=" + str(number) + "&message=××××" + " HTTP/1.1\r\nHost: 127.0.0.1:5700\r\nConnection: close\r\n\r\n"
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((\'127.0.0.1\',5700))
client.send(payload.encode("utf-8"))
client.close()
信息处理
RabbitBot的学习信息保存至一个txt文档中,个人学习问题不大,但做成项目的话不建议这么使用。
获取信息和处理信息的一些函数:
#获取信息类型 群聊/私聊 group/private
def get_message_type():
return all_message[\'message_type\']
#获取群号/私聊qq号
def get_number():
if get_message_type() == \'group\':
return all_message[\'group_id\']
elif get_message_type() == \'private\':
return all_message[\'user_id\']
else:
print(\'出错啦!找不到群号/QQ号\')
exit()
# 获取信息发送者的QQ号
def get_user_id():
return all_message[\'user_id\']
#获取发送的信息
def get_raw_message():
return all_message[\'raw_message\']
#查找txt文本数据库
def txt_msg(msg):
fp = open("/机器人/txt.txt", "r",encoding=\'utf-8\')
while 1:
s = fp.readline()
if not s:
fp.close()
if flag == 2:
return
return error()
s = s.strip(\'\n\')
s1 = s.split(\' \')[0]
s2 = s.split(\' \')[1]
if \'[CQ:at,qq=×××××] \' + s1 == msg:
fp.close()
return s2
发送信息的一些函数:
#帮助界面
def help_interface():
number = get_number()
payload = "GET /send_group_msg?group_id=" + str(number) + "&message=学习方式:%0a私聊rabbit酱,发送学习信息。%0a学习格式:%27学习%27%20%2b%20发送信息%20%2b%20回复信息,以空格分开%0a例:学习%20我爱你%20我也爱你" + " HTTP/1.1\r\nHost: 127.0.0.1:5700\r\nConnection: close\r\n\r\n"
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((\'127.0.0.1\',5700))
client.send(payload.encode("utf-8"))
client.close()
#错误
def error():
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((\'127.0.0.1\',5700))
rand = random.randint(1,4)
number = get_number()
if rand == 1:
msg = "我听不懂你在说什么哦"
elif rand == 2:
msg = "我好笨,听不懂呜呜呜"
elif rand == 3:
msg = "啊?发生了什么"
elif rand == 4:
msg = "干啥呢干啥呢"
payload = "GET /send_group_msg?group_id=" + str(number) + "&message=" + msg + " HTTP/1.1\r\nHost: 127.0.0.1:5700\r\nConnection: close\r\n\r\n"
client.send(payload.encode("utf-8"))
client.close()
#发送猫猫图
,图片保存在本地
def send_cat_pic():
global flag
flag = 1
cat_list = os.listdir("/data/catpic")
all_message[\'raw_message\'] = "[CQ:image,file=file:///data/catpic/"+ random.choice(cat_list)+"]"
client_to_conn()
#发送setu,图片从API内获取
def send_setu_pic():
apikey = \'×××××××××××××××\'
req_url="https://api.lolicon.app/setu/"
params = {"apikey":apikey}
res=requests.get(req_url,params=params)
setu_url=res.json()[\'data\'][0][\'url\']
all_message[\'raw_message\'] ="[CQ:image,file="+setu_url+"]"
client_to_conn()
私聊机器人学习数据:
#调教机器人
#这块代码也有点bug,需要后期调整。
def training_message():
s = get_raw_message()
if s.split(\' \')[0] != \'学习\':
return
s2 = s.split(\' \')[1]
s3 = s.split(\' \')[2]
s = s2 + \' \' + s3
fp = open("/机器人/txt.txt", "a",encoding=\'utf-8\')
fp.write(\'\n\')
fp.write(s)
fp.close()
client_to_conn()
获取数据的第一时间判断信息内容:
#首次判断信息内容
def first_judgement():
if get_message_type() == \'private\':
training_message()
if get_raw_message() == \'[CQ:at,qq=×××××××××] help\':
help_interface()
return
if get_raw_message() == \'[CQ:at,qq=×××××××××] setu\':
send_setu()
return
elif get_raw_message() == \'[CQ:at,qq=×××××××××] 猫猫图\':
send_cat_pic()
return
elif len(get_raw_message()) < 20: #即使不@,也有15%概率回复信息
rand = random.randint(1,20)
if rand <= 3:
global flag
flag = 2
all_message[\'raw_message\'] = \'[CQ:at,qq=×××××××××] \' + all_message[\'raw_message\']
client_to_conn()
else:
return
elif get_raw_message()[0:20] != \'[CQ:at,qq=×××××××××]\':
return
client_to_conn()
循环部分:
#flag为全局变量
#flag = 0 正常
#flag = 1 数据不通过数据库
#flag = 2 退出
#使用try、except语句保证程序不会因部分错误退出。
while 1:
global flag
flag = 0
all_message = rev_msg()
#print(all_message)
try:
first_judgement()
except:
continue
示例
支持收发文本、图片、表情等等。
利用api获取setu。
最终项目可搭载在服务器上不间断运行,目前只实现了基本功能,还有许多功能有待开发,欢迎大佬们来找我一起交流学习。