from model.DataBaseUtils import MysqlUtils import base64 import requests import time from config.using_config import aes_token from urllib.parse import urlencode from Crypto.Cipher import AES def get_g_token(g_time, params): bs = AES.block_size pad = lambda s: s + (bs - len(s) % bs) * chr(bs - len(s) % bs) # 设置加密方式 cipher = AES.new(aes_token.encode('utf-8'), AES.MODE_ECB) # 设置原始数据 pad_params = params + str(g_time) # 数据加密 data = cipher.encrypt(pad(pad_params).encode('utf-8')) g_token = (base64.encodebytes(data)).decode('utf8').strip() return g_token def get_auth_user(user_id): """获取用户拥有所有用户(包括自己)的权限""" g_time = int(time.time()) base_url = 'https://api.zanxiangnet.com' params = f'/erp/api/user/subUser/3/{user_id}' print(params) g_token = get_g_token(g_time, params) params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token}) url = base_url + params + '?' + params_other_urlencode rsp = requests.get(url) nick_name_list = [] for _ in rsp.json()['data']: nick_name_list.append(_['nickName']) return nick_name_list def get_auth_channel_self(user_id): g_time = int(time.time()) base_url = 'https://api.zanxiangnet.com' params = '/erp/api/resource/search/3' g_token = get_g_token(g_time, params) params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token}) params_other = f'?userIds={user_id}' url = base_url + params + params_other + '&' + params_other_urlencode rsp = requests.get(url) channel_list = rsp.json()['data'] data1 = [] if channel_list: for _ in channel_list: data1.append(_['resourceName']) return tuple(data1) def get_auth_channel(user_id): """获取用户拥有的所有公众号权限""" db = MysqlUtils() # 普通权限------java,获取本人对应拥有的公众号 g_time = int(time.time()) base_url = 'https://api.zanxiangnet.com' params = '/erp/api/resource/search/3' g_token = get_g_token(g_time, params) params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token}) params_other = f'?userIds={user_id}' url = base_url + params + params_other + '&' + params_other_urlencode rsp = requests.get(url) channel_list = rsp.json()['data'] data1 = [] if channel_list: for _ in channel_list: data1.append(_['resourceName']) sql2 = f"""select GROUP_CONCAT(channel_ids) from user_channel_group_auth a left join channel_group b on a.channel_group_id=b.id where user_id={user_id}""" data2 = db.quchen_text.getOne(sql2) data3 = [] if data2: g_time = int(time.time()) base_url = 'https://api.zanxiangnet.com' params = '/erp/api/resource/search/3' g_token = get_g_token(g_time, params) params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token}) url = base_url + params + '?' + params_other_urlencode rsp = requests.get(url) channel_list = [] channel_dict = {} for _ in rsp.json()['data']: channel_list.append((_['id'], _['resourceName'])) channel_dict[_['id']] = _['resourceName'] data2_2 = tuple(data2.split(',')) for _ in data2_2: data3.append(channel_dict[int(_)]) return tuple(data1 + data3) def super_auth(): "获取超级数据权限的用户列表" g_time = int(time.time()) base_url = 'https://api.zanxiangnet.com' params = '/erp/api/user/search/3' g_token = get_g_token(g_time, params) params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token}) url = base_url + params + '?' + params_other_urlencode rsp = requests.get(url) print(rsp.text) user_list = [] for _ in rsp.json()['data']: if _['powerLevel'] >= 99: user_list.append(_['userId']) print(user_list) return user_list if __name__ == '__main__': print(super_auth()) # print(get_role(78)) # print(f"ssed{tuple([1, 3, 4])}") # xx = super_auth() # print(xx)