UserAuthUtils.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from model.DataBaseUtils import MysqlUtils
  2. import base64
  3. import requests
  4. import time
  5. from config.using_config import aes_token
  6. from urllib.parse import urlencode
  7. from Crypto.Cipher import AES
  8. def get_g_token(g_time, params):
  9. bs = AES.block_size
  10. pad = lambda s: s + (bs - len(s) % bs) * chr(bs - len(s) % bs)
  11. # 设置加密方式
  12. cipher = AES.new(aes_token.encode('utf-8'), AES.MODE_ECB)
  13. # 设置原始数据
  14. pad_params = params + str(g_time)
  15. # 数据加密
  16. data = cipher.encrypt(pad(pad_params).encode('utf-8'))
  17. g_token = (base64.encodebytes(data)).decode('utf8').strip()
  18. return g_token
  19. def get_auth_user(user_id):
  20. """获取用户拥有所有用户(包括自己)的权限"""
  21. g_time = int(time.time())
  22. base_url = 'https://api.zanxiangnet.com'
  23. params = f'/erp/api/user/subUser/3/{user_id}'
  24. print(params)
  25. g_token = get_g_token(g_time, params)
  26. params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
  27. url = base_url + params + '?' + params_other_urlencode
  28. rsp = requests.get(url)
  29. nick_name_list = []
  30. for _ in rsp.json()['data']:
  31. nick_name_list.append(_['nickName'])
  32. return nick_name_list
  33. def get_auth_channel_self(user_id):
  34. g_time = int(time.time())
  35. base_url = 'https://api.zanxiangnet.com'
  36. params = '/erp/api/resource/search/3'
  37. g_token = get_g_token(g_time, params)
  38. params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
  39. params_other = f'?userIds={user_id}'
  40. url = base_url + params + params_other + '&' + params_other_urlencode
  41. rsp = requests.get(url)
  42. channel_list = rsp.json()['data']
  43. data1 = []
  44. if channel_list:
  45. for _ in channel_list:
  46. data1.append(_['resourceName'])
  47. return tuple(data1)
  48. def get_auth_channel(user_id):
  49. """获取用户拥有的所有公众号权限"""
  50. db = MysqlUtils()
  51. # 普通权限------java,获取本人对应拥有的公众号
  52. g_time = int(time.time())
  53. base_url = 'https://api.zanxiangnet.com'
  54. params = '/erp/api/resource/search/3'
  55. g_token = get_g_token(g_time, params)
  56. params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
  57. params_other = f'?userIds={user_id}'
  58. url = base_url + params + params_other + '&' + params_other_urlencode
  59. rsp = requests.get(url)
  60. channel_list = rsp.json()['data']
  61. data1 = []
  62. if channel_list:
  63. for _ in channel_list:
  64. data1.append(_['resourceName'])
  65. sql2 = f"""select GROUP_CONCAT(channel_ids) from user_channel_group_auth a
  66. left join channel_group b on a.channel_group_id=b.id
  67. where user_id={user_id}"""
  68. data2 = db.quchen_text.getOne(sql2)
  69. data3 = []
  70. if data2:
  71. g_time = int(time.time())
  72. base_url = 'https://api.zanxiangnet.com'
  73. params = '/erp/api/resource/search/3'
  74. g_token = get_g_token(g_time, params)
  75. params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
  76. url = base_url + params + '?' + params_other_urlencode
  77. rsp = requests.get(url)
  78. channel_list = []
  79. channel_dict = {}
  80. for _ in rsp.json()['data']:
  81. channel_list.append((_['id'], _['resourceName']))
  82. channel_dict[_['id']] = _['resourceName']
  83. data2_2 = tuple(data2.split(','))
  84. for _ in data2_2:
  85. data3.append(channel_dict[int(_)])
  86. return tuple(data1 + data3)
  87. def super_auth():
  88. "获取超级数据权限的用户列表"
  89. g_time = int(time.time())
  90. base_url = 'https://api.zanxiangnet.com'
  91. params = '/erp/api/user/search/3'
  92. g_token = get_g_token(g_time, params)
  93. params_other_urlencode = urlencode({'g_time': g_time, 'g_token': g_token})
  94. url = base_url + params + '?' + params_other_urlencode
  95. rsp = requests.get(url)
  96. print(rsp.text)
  97. user_list = []
  98. for _ in rsp.json()['data']:
  99. if _['powerLevel'] >= 99:
  100. user_list.append(_['userId'])
  101. print(user_list)
  102. return user_list
  103. if __name__ == '__main__':
  104. print(super_auth())
  105. # print(get_role(78))
  106. # print(f"ssed{tuple([1, 3, 4])}")
  107. # xx = super_auth()
  108. # print(xx)