123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- from model.DataBaseUtils import MysqlUtils
- import base64
- import requests
- import time
- from config.using_config import aes_token
- from urllib.parse import urlencode
- from Crypto.Cipher import AES
- from Crypto.Util.Padding import pad
- def get_g_token(g_time, params):
- bs = AES.block_size
- # pad = lambda s: s + (bs - len(s) % bs) * chr(bs - len(s) % bs)
- # 设置加密方式
- cipher = AES.new(aes_token.encode('utf-8'), AES.MODE_ECB)
- # 设置原始数据
- pad_params = params + str(g_time)
- # 数据加密
- data = cipher.encrypt(pad(pad_params.encode('utf-8'), bs))
- g_token = (base64.encodebytes(data)).decode('utf8').strip()
- return g_token
- def get_auth_user(user_id):
- """获取用户拥有所有用户(包括自己)的权限"""
- g_time = int(time.time())
- base_url = 'https://api.zanxiangnet.com'
- params = f'/erp/api/user/subUser/3/{user_id}'
- print(params)
- g_token = get_g_token(g_time, params)
- params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
- url = base_url + params + '?' + params_other_urlencode
- rsp = requests.get(url)
- nick_name_list = []
- for _ in rsp.json()['data']:
- nick_name_list.append(_['nickName'])
- return nick_name_list
- def get_auth_channel_self(user_id):
- g_time = int(time.time())
- base_url = 'https://api.zanxiangnet.com'
- params = '/erp/api/resource/search/3'
- g_token = get_g_token(g_time, params)
- params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
- params_other = f'?userIds={user_id}'
- url = base_url + params + params_other + '&' + params_other_urlencode
- rsp = requests.get(url)
- channel_list = rsp.json()['data']
- data1 = []
- if channel_list:
- for _ in channel_list:
- data1.append(_['resourceName'])
- return tuple(data1)
- def get_auth_channel(user_id):
- """获取用户拥有的所有公众号权限"""
- db = MysqlUtils()
- # 普通权限------java,获取本人对应拥有的公众号
- g_time = int(time.time())
- base_url = 'https://api.zanxiangnet.com'
- params = f'/erp/api/resource/subUserResourceList/3/{user_id}/投手,运营'
- g_token = get_g_token(g_time, params)
- params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
- # 防止urlencode自动换行
- params_other_urlencode = params_other_urlencode.replace('%0A', '')
- url = base_url + params + '?' + params_other_urlencode
- rsp = requests.get(url)
- channel_list = rsp.json()['data']
- data1 = []
- if channel_list:
- for _ in channel_list:
- data1.append(_['resourceName'])
- sql2 = f"""select GROUP_CONCAT(channel_ids) from user_channel_group_auth a
- left join channel_group b on a.channel_group_id=b.id
- where user_id={user_id}"""
- data2 = db.quchen_text.getOne(sql2)
- data3 = []
- if data2:
- g_time = int(time.time())
- base_url = 'https://api.zanxiangnet.com'
- params = '/erp/api/resource/search/3'
- g_token = get_g_token(g_time, params)
- params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
- url = base_url + params + '?' + params_other_urlencode
- rsp = requests.get(url)
- channel_list = []
- channel_dict = {}
- for _ in rsp.json()['data']:
- channel_list.append((_['id'], _['resourceName']))
- channel_dict[_['id']] = _['resourceName']
- data2_2 = tuple(data2.split(','))
- for _ in data2_2:
- data3.append(channel_dict[int(_)])
- return tuple(data1 + data3)
- def get_auth_game_info(user_id):
- """获取用户拥有的所有用户拥有的游戏"""
- db = MysqlUtils()
- # 1.获取用户名字
- g_time = int(time.time())
- base_url = 'https://api.zanxiangnet.com'
- params = '/erp/api/user/search/3'
- g_token = get_g_token(g_time, params)
- params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
- url = base_url + params + '?' + params_other_urlencode
- rsp = requests.get(url)
- user_name = None
- for _ in rsp.json()['data']:
- if str(_['userId']) == str(user_id):
- user_name = _['nickName']
- # 2.获取所有游戏id
- sql = f'''
- SELECT d.id ,d.name,min(DATE_FORMAT(a.start_date,"%Y-%m-%d"))
- FROM quchen_text.advertiser_vx a
- left join db_mp.mp_mp_conf b on a.name =b.wx_name
- left join db_mp.mp_conf_agent c on c.advertiser_conf_id = b.id
- left join db_mp.h_game d on c.app_id = d.id
- where pitcher ='{user_name}' and d.id is not null
- group by id
- '''
- user_ids = db.quchen_text.get_data_list(sql)
- return user_ids
- def get_auth_game_name(user_id):
- """获取用户拥有的所有用户拥有的游戏"""
- db = MysqlUtils()
- # 1.获取用户名字
- g_time = int(time.time())
- base_url = 'https://api.zanxiangnet.com'
- params = '/erp/api/user/search/3'
- g_token = get_g_token(g_time, params)
- params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
- url = base_url + params + '?' + params_other_urlencode
- rsp = requests.get(url)
- user_name = None
- for _ in rsp.json()['data']:
- if str(_['userId']) == str(user_id):
- user_name = _['nickName']
- # 2.获取所有游戏id
- sql = f'''
- SELECT d.name
- FROM quchen_text.advertiser_vx a
- left join db_mp.mp_mp_conf b on a.name =b.wx_name
- left join db_mp.mp_conf_agent c on c.advertiser_conf_id = b.id
- left join db_mp.h_game d on c.app_id = d.id
- where pitcher ='{user_name}' and d.name is not null
- group by name
- '''
- user_ids = db.quchen_text.get_data_list(sql)
- return user_ids
- def super_auth():
- "获取超级数据权限的用户列表"
- g_time = int(time.time())
- base_url = 'https://api.zanxiangnet.com'
- params = '/erp/api/user/search/3'
- g_token = get_g_token(g_time, params)
- params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
- url = base_url + params + '?' + params_other_urlencode
- rsp = requests.get(url)
- print(rsp.text)
- user_list = []
- for _ in rsp.json()['data']:
- if _['powerLevel'] >= 99:
- user_list.append(_['userId'])
- print(user_list)
- return user_list
- if __name__ == '__main__':
- print(get_auth_game_name(85))
- # print(get_role(78))
- # print(f"ssed{tuple([1, 3, 4])}")
- # xx = super_auth()
- # print(xx)
|