send.py
1
#-*- coding:utf8 -*- 2 import requests 3 import json 4 from reqwx import get_access_token, send_module_msg, get_module_id 5 a_Token = get_access_token() 6 7 url = "https://api.weixin.qq.com/cgi-bin/message/custom/send" 8 9 req_get_uid_url = "https://api.weixin.qq.com/cgi-bin/user/get" 10 para_get_uid_data = {"access_token": a_Token} 11 res_get_uid = req_get_uid = requests.get(req_get_uid_url, para_get_uid_data) 12 res_b = req_get_uid.content 13 res = json.loads(res_b) 14 print(res) 15 openid = res["data"]["openid"][0] 16 17 ss = "你好" 18 txt_body = {"touser":openid, 19 "msgtype":"text", 20 "text":{ 21 "content":ss 22 23 }} 24 25 sys_name = "测试系统说明" 26 sys_info = "测试信息内容" 27 ss_body = {"sysname":{ 28 "value":sys_name, 29 "color":"#FF3117" 30 }, 31 "sysinfo":{ 32 "value": sys_info, 33 "color": "#173177" 34 }} 35 36 module_id = get_module_id(a_Token)
37 send_module_msg(a_Token,openid,module_id,ss_body)

reqwx.py

 1 import requests
 2 import json
 3 from config import APPID, SECRET   #get config from config.py
 4 
 5 
 6 def to_json(str):
 7     return json.loads(str)
 8 
 9 
10 def get_access_token():
11     """
12   获取全局接口凭证,默认有效2小时,
13     """
14     result = requests.get(
15         url="https://api.weixin.qq.com/cgi-bin/token",
16         params={
17             "grant_type": "client_credential",
18             "appid": APPID,
19             "secret": SECRET
20         }
21     ).json()
22     if result.get("access_token"):
23         access_token = result.get("access_token")
24 
25     else:
26         access_token = None
27     f = open("config.ini","w")
28     f.write("token"+ access_token)
29     f.close()
30     return access_token
31 
32 
33 def sendmsg(openid,a_Token,msg):
34     """
35     发送消息给用户
36     :param openid: 用户ID
37     :param msg: 消息内容
38     :return:
39     """
40     send_url = "https://api.weixin.qq.com/cgi-bin/message/custom/send"
41     body = {
42         "touser": openid,
43         "msgtype": "text",
44         "text": {
45             "context": msg
46         }
47     }
48     s_req = requests.post(send_url,params={"access_token":a_Token} , data=bytes((body), encoding="UTF-8"))
49 
50 
51 def send_module_msg(a_Token, openid, t_id, msg):
52     """
53     发送模板消息
54     :param a_Token:
55     :param openid:
56     :param t_id:
57     :param msg:
58     :return:  返回状态 json
59     """
60     send_url = "https://api.weixin.qq.com/cgi-bin/message/template/send"
61     para = {"access_token":a_Token}
62     body = {
63         "touser": openid,
64         "topcolor": "#FE0000",
65         "template_id":t_id,
66         "data":msg
67     }
68     r = requests.post(send_url, params=para, json=body)
69     return r.content
70 
71 
72 def get_module_id(a_Token):
73     para_m_url = "https://api.weixin.qq.com/cgi-bin/template/get_all_private_template"
74     req_m_id = requests.post(para_m_url, params={"access_token": a_Token})
75     module_list = to_json(req_m_id.content)
76     module_id = module_list["template_list"][0]["template_id"]
77     return module_id

后期优化:

token 持久化

获取人员和消息模板,发送消息。

集成到其他内容,比如监控发消息,自动化测试发送结果等

版权声明:本文为malone原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/malone/p/10529663.html