test_multiprocessing.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. __title__ = '测试多线程类'
  5. @Time : 2020/9/26 22:53
  6. @Author : Kenny-PC
  7. @Software: PyCharm
  8. # code is far away from bugs with the god animal protecting
  9. I love animals. They taste delicious.
  10. ┏┓ ┏┓
  11. ┏┛┻━━━┛┻┓
  12. ┃ ☃ ┃
  13. ┃ ┳┛ ┗┳ ┃
  14. ┃ ┻ ┃
  15. ┗━┓ ┏━┛
  16. ┃ ┗━━━┓
  17. ┃ 神兽保佑 ┣┓
  18. ┃ 永无BUG! ┏┛
  19. ┗┓┓┏━┳┓┏┛
  20. ┃┫┫ ┃┫┫
  21. ┗┻┛ ┗┻┛
  22. """
  23. '''  使用16线程爬取信息
  24. 任务添加函数、任务执行函数;进程、线程切换函数;进、线程开启函数;
  25. '''
  26. import requests
  27. from urllib import request
  28. import ssl
  29. ssl._create_default_https_context = ssl._create_unverified_context
  30. from datetime import datetime
  31. from multiprocessing import Pool as ProcessPoll # 进程池
  32. from multiprocessing.dummy import Pool as ThreadPool # 线程池
  33. from util import date_util, platform_util
  34. import account_list as al
  35. import time
  36. import datetime
  37. import hashlib
  38. from urllib import parse
  39. import json
  40. zzy_order_list = ()
  41. def md5value(s):
  42. md5 = hashlib.md5()
  43. md5.update(s.encode("utf-8"))
  44. return md5.hexdigest()
  45. # 任务执行
  46. def get_page(task_q):
  47. headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'}
  48. # url action
  49. qs = parse.parse_qs(task_q)
  50. my_key = str(qs['a'][0])
  51. secert = str(qs['b'][0])
  52. stage = ''
  53. if 'c' in qs.keys():
  54. stage = qs['c'][0]
  55. strlist = task_q.split('&a=')
  56. task_q = strlist[0]
  57. req = request.Request(task_q, headers=headers)
  58. response = request.urlopen(req)
  59. responseBodyStr = response.read().decode('utf8')
  60. channel_list = json.loads(responseBodyStr)
  61. # 掌中云的时间格式比较特殊,转换下
  62. st = platform_util.getZzyQueryTime(date_util.getYesterdayStartTime())
  63. et = platform_util.getZzyQueryTime(date_util.getYesterdayEndTime())
  64. if 'data' in channel_list:
  65. items = channel_list['data']['items']
  66. # items = []
  67. else:
  68. print('channel_list', channel_list)
  69. items = []
  70. keyChildOrder = ()
  71. for item in items: # 获取channel_id 后逐个拉取历史orders
  72. r = ()
  73. channel_id = item['id']
  74. channel = item['nickname']
  75. status = str(1)
  76. per_page = str(1000)
  77. limit_time = et
  78. get_time = st
  79. lt = parse.urlencode({'created_at[lt]': limit_time})
  80. gt = parse.urlencode({'created_at[gt]': get_time})
  81. url_1 = 'https://openapi.818tu.com/partners/channel/orders/list?'
  82. my_sign_1 = md5value(secert + 'channel_id=' + str(
  83. channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&per_page=' + per_page + '&status=' + status)
  84. parameter_1 = 'channel_id=' + str(
  85. channel_id) + '&' + gt + '&' + lt + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_1
  86. orders = requests.get(url_1 + parameter_1)
  87. t = orders.json()['data']['count'] // int(per_page) + 1
  88. for page in range(1, t + 1):
  89. my_sign_2 = md5value(secert + 'channel_id=' + str(
  90. channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&page=' + str(
  91. page) + '&per_page=' + per_page + '&status=' + status)
  92. parameter_2 = 'channel_id=' + str(channel_id) + '&' + gt + '&' + lt + '&page=' + str(
  93. page) + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_2
  94. orders_1 = requests.get(url_1 + parameter_2)
  95. b = orders_1.json()['data']['items']
  96. for a in b:
  97. c = {}
  98. c['user_id'] = str(a['member']['openid'])
  99. c['channel'] = channel
  100. c['reg_time'] = a['member']['created_at']
  101. c['channel_id'] = channel_id
  102. c['amount'] = round(a['price'] / 100, 2)
  103. c['order_id'] = str(a['id'])
  104. c['order_time'] = a['created_at']
  105. c['platform'] = '掌中云'
  106. c['stage'] = stage
  107. # c['amount']=a['amount']
  108. dtime = datetime.datetime.strptime(a['created_at'][0:10], "%Y-%m-%d")
  109. c['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
  110. if str(a['from_novel_id']) != 'None':
  111. c['from_novel'] = a['from_novel']['title']
  112. else:
  113. c['from_novel'] = 'None'
  114. """
  115. del a['member']
  116. del a['referral_link_id']
  117. del a['id']
  118. del a['created_at']
  119. del a['paid_at']
  120. del a['border_id']
  121. del a['from_novel_id']
  122. del a['status']
  123. del a['price']
  124. del a['agent_uid']
  125. """
  126. x = sorted(c.items(), key=lambda item: item[0])
  127. x = dict(x)
  128. x = tuple(x.values())
  129. r = r + ((x),)
  130. print('zzy_my_key_chanel', my_key, channel, len(r))
  131. if len(r) > 0:
  132. keyChildOrder = r + keyChildOrder
  133. print('zzy_my_key:', my_key, ' 下的订单数量为:', len(keyChildOrder))
  134. return keyChildOrder
  135. # 任务添加
  136. def url_list():
  137. task_list = []
  138. API_list = al.zzy_account_list
  139. url = 'https://openapi.818tu.com/partners/channel/channels/list?'
  140. for x in API_list:
  141. my_key = x[0]
  142. secert = x[1]
  143. stage = x[2]
  144. my_sign = md5value(secert + 'key=' + my_key)
  145. #todo 这里一定要手动编码,坑爹的
  146. my_key = parse.quote(x[0])
  147. parameter = 'key=' + my_key + '&sign=' + my_sign
  148. real_url = url + parameter
  149. full_url = real_url + '&a=' + my_key + '&b=' + secert + '&c=' + stage
  150. task_list.append(full_url)
  151. return task_list
  152. # 设定进、线程
  153. def get_pool(way=True,count=4):
  154. if way:
  155. # 进程
  156. pool = ProcessPoll(count)
  157. else:
  158. # 线程
  159. pool = ThreadPool(count)
  160. return pool
  161. # 启动
  162. def open_pool():
  163. pool = get_pool(way=False,count=16)
  164. task_q = url_list()
  165. results = pool.map(get_page, task_q) # 函数, 列表或元组
  166. pool.close()
  167. pool.join()
  168. totalCount = 0
  169. if len(results) > 0:
  170. for item in results:
  171. totalCount = totalCount + len(item)
  172. print('掌中云中订单数据:', totalCount)
  173. if __name__ == '__main__':
  174. start_second_time = date_util.getCurrentSecondTime()
  175. open_pool()
  176. print('执行时间:', date_util.getCurrentSecondTime() - start_second_time)