import json import time import hmac import hashlib import base64 import requests import urllib.parse import logging from settings import ding_talk import re import socket def intranet_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return str(ip) def web_ip(): response = requests.get('http://ip.tool.chinaz.com/') ips = re.findall('\d+\.\d+\.\d+\.\d+', response.text) if len(ips) > 0: return ips[0] else: return None def sign_dingtalk(): timestamp = round(time.time() * 1000) secret = ding_talk['secret_sign'] secret_enc = bytes(secret, 'utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = bytes(string_to_sign, 'utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote(base64.b64encode(hmac_code)) return timestamp, sign def send_message(content, atsombody=None, is_self=True, is_carryip=False): ''' :param content:发送文本 :param atsombody: [ "156xxxx8827", "151XXX2316" ] :param is_self: :return: ''' try: base_url = 'https://oapi.dingtalk.com/robot/send?access_token=dc19e91491341b81f29d7fc347071647031e3d33d8df73f15e4529ad01520e22' timestamp, sign = sign_dingtalk() url = base_url + "×tamp={timestamp}&sign={sign}".format(timestamp=timestamp, sign=sign) if is_carryip: ip_intranet = intranet_ip() ip_public = web_ip() ip_content = '内网ip {} ,公网ip {}'.format(ip_intranet, ip_public) content = content + '\n' + ip_content data = { "msgtype": "text", "text": { "content": content[:10000] }, "at": { "atMobiles": [atsombody], "isAtAll": False } } headers = {'content-type': 'application/json'} # 请求头 time.sleep(1) response = requests.post(url=url, headers=headers, data=json.dumps(data)) if response.json()['errcode'] == 0: logging.info('钉钉信息:{} 发送成功'.format(content)) else: time.sleep(10) logging.warning('钉钉信息出现问题:{}'.format(str(response.text))) except Exception as e: logging.error('钉钉发消息发生问题 error:{} ') time.sleep(10)