#!/usr/bin/env python # -*- coding: utf-8 -*- """ __title__ = '测试多线程类' @Time : 2020/9/26 22:53 @Author : Kenny-PC @Software: PyCharm # code is far away from bugs with the god animal protecting I love animals. They taste delicious. ┏┓ ┏┓ ┏┛┻━━━┛┻┓ ┃ ☃ ┃ ┃ ┳┛ ┗┳ ┃ ┃ ┻ ┃ ┗━┓ ┏━┛ ┃ ┗━━━┓ ┃ 神兽保佑 ┣┓ ┃ 永无BUG! ┏┛ ┗┓┓┏━┳┓┏┛ ┃┫┫ ┃┫┫ ┗┻┛ ┗┻┛ """ '''  使用16线程爬取信息 任务添加函数、任务执行函数;进程、线程切换函数;进、线程开启函数; ''' import ssl from urllib import request, parse import requests ssl._create_default_https_context = ssl._create_unverified_context from datetime import datetime from multiprocessing import Pool as ProcessPoll # 进程池 from multiprocessing.dummy import Pool as ThreadPool # 线程池 import time import datetime import hashlib import json from util import date_util, platform_util import account_list as al zzy_order_list = () def md5value(s): md5 = hashlib.md5() md5.update(s.encode("utf-8")) return md5.hexdigest() # 任务执行 def get_page(task_q): headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36' } # url action qs = parse.parse_qs(task_q) my_key = str(qs['a'][0]) secert = str(qs['b'][0]) stage = '' if 'c' in qs.keys(): stage = qs['c'][0] strlist = task_q.split('&a=') task_q = strlist[0] req = request.Request(task_q, headers=headers) response = request.urlopen(req) responseBodyStr = response.read().decode('utf8') channel_list = json.loads(responseBodyStr) # 掌中云的时间格式比较特殊,转换下 st = platform_util.getZzyQueryTime(date_util.getYesterdayStartTime()) et = platform_util.getZzyQueryTime(date_util.getYesterdayEndTime()) if 'data' in channel_list: items = channel_list['data']['items'] # items = [] else: print('channel_list', channel_list) items = [] keyChildOrder = () for item in items: # 获取channel_id 后逐个拉取历史orders r = () channel_id = item['id'] channel = item['nickname'] status = str(1) per_page = str(1000) limit_time = et get_time = st lt = parse.urlencode({'created_at[lt]': limit_time}) gt = parse.urlencode({'created_at[gt]': get_time}) url_1 = 'https://openapi.818tu.com/partners/channel/orders/list?' my_sign_1 = md5value(secert + 'channel_id=' + str( channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&per_page=' + per_page + '&status=' + status) parameter_1 = 'channel_id=' + str( channel_id) + '&' + gt + '&' + lt + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_1 orders = requests.get(url_1 + parameter_1) t = orders.json()['data']['count'] // int(per_page) + 1 for page in range(1, t + 1): my_sign_2 = md5value(secert + 'channel_id=' + str( channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status) parameter_2 = 'channel_id=' + str(channel_id) + '&' + gt + '&' + lt + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_2 orders_1 = requests.get(url_1 + parameter_2) b = orders_1.json()['data']['items'] for a in b: c = {} c['user_id'] = str(a['member']['openid']) c['channel'] = channel c['reg_time'] = a['member']['created_at'] c['channel_id'] = channel_id c['amount'] = round(a['price'] / 100, 2) c['order_id'] = str(a['id']) c['order_time'] = a['created_at'] c['platform'] = '掌中云' c['stage'] = stage # c['amount']=a['amount'] dtime = datetime.datetime.strptime(a['created_at'][0:10], "%Y-%m-%d") c['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600 if str(a['from_novel_id']) != 'None': c['from_novel'] = a['from_novel']['title'] else: c['from_novel'] = 'None' """ del a['member'] del a['referral_link_id'] del a['id'] del a['created_at'] del a['paid_at'] del a['border_id'] del a['from_novel_id'] del a['status'] del a['price'] del a['agent_uid'] """ x = sorted(c.items(), key=lambda item: item[0]) x = dict(x) x = tuple(x.values()) r = r + ((x),) print('zzy_my_key_chanel', my_key, channel, len(r)) if len(r) > 0: keyChildOrder = r + keyChildOrder print('zzy_my_key:', my_key, ' 下的订单数量为:', len(keyChildOrder)) return keyChildOrder # 任务添加 def url_list(): task_list = [] API_list = al.zzy_account_list url = 'https://openapi.818tu.com/partners/channel/channels/list?' for x in API_list: my_key = x[0] secert = x[1] stage = x[2] my_sign = md5value(secert + 'key=' + my_key) # todo 这里一定要手动编码,坑爹的 my_key = parse.quote(x[0]) parameter = 'key=' + my_key + '&sign=' + my_sign real_url = url + parameter full_url = real_url + '&a=' + my_key + '&b=' + secert + '&c=' + stage task_list.append(full_url) return task_list # 设定进、线程 def get_pool(way=True, count=4): if way: # 进程 pool = ProcessPoll(count) else: # 线程 pool = ThreadPool(count) return pool # 启动 def open_pool(): pool = get_pool(way=False, count=16) task_q = url_list() results = pool.map(get_page, task_q) # 函数, 列表或元组 pool.close() pool.join() totalCount = 0 if len(results) > 0: for item in results: totalCount = totalCount + len(item) print('掌中云中订单数据:', totalCount) if __name__ == '__main__': start_second_time = date_util.getCurrentSecondTime() open_pool() print('执行时间:', date_util.getCurrentSecondTime() - start_second_time)