test_multiprocessing.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. __title__ = '测试多线程类'
  5. @Time : 2020/9/26 22:53
  6. @Author : Kenny-PC
  7. @Software: PyCharm
  8. # code is far away from bugs with the god animal protecting
  9. I love animals. They taste delicious.
  10. ┏┓ ┏┓
  11. ┏┛┻━━━┛┻┓
  12. ┃ ☃ ┃
  13. ┃ ┳┛ ┗┳ ┃
  14. ┃ ┻ ┃
  15. ┗━┓ ┏━┛
  16. ┃ ┗━━━┓
  17. ┃ 神兽保佑 ┣┓
  18. ┃ 永无BUG! ┏┛
  19. ┗┓┓┏━┳┓┏┛
  20. ┃┫┫ ┃┫┫
  21. ┗┻┛ ┗┻┛
  22. """
  23. '''  使用16线程爬取信息
  24. 任务添加函数、任务执行函数;进程、线程切换函数;进、线程开启函数;
  25. '''
  26. import ssl
  27. from urllib import request, parse
  28. import requests
  29. ssl._create_default_https_context = ssl._create_unverified_context
  30. from datetime import datetime
  31. from multiprocessing import Pool as ProcessPoll # 进程池
  32. from multiprocessing.dummy import Pool as ThreadPool # 线程池
  33. import time
  34. import datetime
  35. import hashlib
  36. import json
  37. from util import date_util, platform_util
  38. import account_list as al
  39. zzy_order_list = ()
  40. def md5value(s):
  41. md5 = hashlib.md5()
  42. md5.update(s.encode("utf-8"))
  43. return md5.hexdigest()
  44. # 任务执行
  45. def get_page(task_q):
  46. headers = {
  47. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
  48. }
  49. # url action
  50. qs = parse.parse_qs(task_q)
  51. my_key = str(qs['a'][0])
  52. secert = str(qs['b'][0])
  53. stage = ''
  54. if 'c' in qs.keys():
  55. stage = qs['c'][0]
  56. strlist = task_q.split('&a=')
  57. task_q = strlist[0]
  58. req = request.Request(task_q, headers=headers)
  59. response = request.urlopen(req)
  60. responseBodyStr = response.read().decode('utf8')
  61. channel_list = json.loads(responseBodyStr)
  62. # 掌中云的时间格式比较特殊,转换下
  63. st = platform_util.getZzyQueryTime(date_util.getYesterdayStartTime())
  64. et = platform_util.getZzyQueryTime(date_util.getYesterdayEndTime())
  65. if 'data' in channel_list:
  66. items = channel_list['data']['items']
  67. # items = []
  68. else:
  69. print('channel_list', channel_list)
  70. items = []
  71. keyChildOrder = ()
  72. for item in items: # 获取channel_id 后逐个拉取历史orders
  73. r = ()
  74. channel_id = item['id']
  75. channel = item['nickname']
  76. status = str(1)
  77. per_page = str(1000)
  78. limit_time = et
  79. get_time = st
  80. lt = parse.urlencode({'created_at[lt]': limit_time})
  81. gt = parse.urlencode({'created_at[gt]': get_time})
  82. url_1 = 'https://openapi.818tu.com/partners/channel/orders/list?'
  83. my_sign_1 = md5value(secert + 'channel_id=' + str(
  84. channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&per_page=' + per_page + '&status=' + status)
  85. parameter_1 = 'channel_id=' + str(
  86. channel_id) + '&' + gt + '&' + lt + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_1
  87. orders = requests.get(url_1 + parameter_1)
  88. t = orders.json()['data']['count'] // int(per_page) + 1
  89. for page in range(1, t + 1):
  90. my_sign_2 = md5value(secert + 'channel_id=' + str(
  91. channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&page=' + str(
  92. page) + '&per_page=' + per_page + '&status=' + status)
  93. parameter_2 = 'channel_id=' + str(channel_id) + '&' + gt + '&' + lt + '&page=' + str(
  94. page) + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_2
  95. orders_1 = requests.get(url_1 + parameter_2)
  96. b = orders_1.json()['data']['items']
  97. for a in b:
  98. c = {}
  99. c['user_id'] = str(a['member']['openid'])
  100. c['channel'] = channel
  101. c['reg_time'] = a['member']['created_at']
  102. c['channel_id'] = channel_id
  103. c['amount'] = round(a['price'] / 100, 2)
  104. c['order_id'] = str(a['id'])
  105. c['order_time'] = a['created_at']
  106. c['platform'] = '掌中云'
  107. c['stage'] = stage
  108. # c['amount']=a['amount']
  109. dtime = datetime.datetime.strptime(a['created_at'][0:10], "%Y-%m-%d")
  110. c['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
  111. if str(a['from_novel_id']) != 'None':
  112. c['from_novel'] = a['from_novel']['title']
  113. else:
  114. c['from_novel'] = 'None'
  115. """
  116. del a['member']
  117. del a['referral_link_id']
  118. del a['id']
  119. del a['created_at']
  120. del a['paid_at']
  121. del a['border_id']
  122. del a['from_novel_id']
  123. del a['status']
  124. del a['price']
  125. del a['agent_uid']
  126. """
  127. x = sorted(c.items(), key=lambda item: item[0])
  128. x = dict(x)
  129. x = tuple(x.values())
  130. r = r + ((x),)
  131. print('zzy_my_key_chanel', my_key, channel, len(r))
  132. if len(r) > 0:
  133. keyChildOrder = r + keyChildOrder
  134. print('zzy_my_key:', my_key, ' 下的订单数量为:', len(keyChildOrder))
  135. return keyChildOrder
  136. # 任务添加
  137. def url_list():
  138. task_list = []
  139. API_list = al.zzy_account_list
  140. url = 'https://openapi.818tu.com/partners/channel/channels/list?'
  141. for x in API_list:
  142. my_key = x[0]
  143. secert = x[1]
  144. stage = x[2]
  145. my_sign = md5value(secert + 'key=' + my_key)
  146. # todo 这里一定要手动编码,坑爹的
  147. my_key = parse.quote(x[0])
  148. parameter = 'key=' + my_key + '&sign=' + my_sign
  149. real_url = url + parameter
  150. full_url = real_url + '&a=' + my_key + '&b=' + secert + '&c=' + stage
  151. task_list.append(full_url)
  152. return task_list
  153. # 设定进、线程
  154. def get_pool(way=True, count=4):
  155. if way:
  156. # 进程
  157. pool = ProcessPoll(count)
  158. else:
  159. # 线程
  160. pool = ThreadPool(count)
  161. return pool
  162. # 启动
  163. def open_pool():
  164. pool = get_pool(way=False, count=16)
  165. task_q = url_list()
  166. results = pool.map(get_page, task_q) # 函数, 列表或元组
  167. pool.close()
  168. pool.join()
  169. totalCount = 0
  170. if len(results) > 0:
  171. for item in results:
  172. totalCount = totalCount + len(item)
  173. print('掌中云中订单数据:', totalCount)
  174. if __name__ == '__main__':
  175. start_second_time = date_util.getCurrentSecondTime()
  176. open_pool()
  177. print('执行时间:', date_util.getCurrentSecondTime() - start_second_time)