123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- __title__ = '测试多线程类'
- @Time : 2020/9/26 22:53
- @Author : Kenny-PC
- @Software: PyCharm
- # code is far away from bugs with the god animal protecting
- I love animals. They taste delicious.
- ┏┓ ┏┓
- ┏┛┻━━━┛┻┓
- ┃ ☃ ┃
- ┃ ┳┛ ┗┳ ┃
- ┃ ┻ ┃
- ┗━┓ ┏━┛
- ┃ ┗━━━┓
- ┃ 神兽保佑 ┣┓
- ┃ 永无BUG! ┏┛
- ┗┓┓┏━┳┓┏┛
- ┃┫┫ ┃┫┫
- ┗┻┛ ┗┻┛
- """
- ''' 使用16线程爬取信息
- 任务添加函数、任务执行函数;进程、线程切换函数;进、线程开启函数;
- '''
- import ssl
- from urllib import request, parse
- import requests
- ssl._create_default_https_context = ssl._create_unverified_context
- from datetime import datetime
- from multiprocessing import Pool as ProcessPoll # 进程池
- from multiprocessing.dummy import Pool as ThreadPool # 线程池
- import time
- import datetime
- import hashlib
- import json
- from util import date_util, platform_util
- import account_list as al
- zzy_order_list = ()
- def md5value(s):
- md5 = hashlib.md5()
- md5.update(s.encode("utf-8"))
- return md5.hexdigest()
- # 任务执行
- def get_page(task_q):
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
- }
- # url action
- qs = parse.parse_qs(task_q)
- my_key = str(qs['a'][0])
- secert = str(qs['b'][0])
- stage = ''
- if 'c' in qs.keys():
- stage = qs['c'][0]
- strlist = task_q.split('&a=')
- task_q = strlist[0]
- req = request.Request(task_q, headers=headers)
- response = request.urlopen(req)
- responseBodyStr = response.read().decode('utf8')
- channel_list = json.loads(responseBodyStr)
- # 掌中云的时间格式比较特殊,转换下
- st = platform_util.getZzyQueryTime(date_util.getYesterdayStartTime())
- et = platform_util.getZzyQueryTime(date_util.getYesterdayEndTime())
- if 'data' in channel_list:
- items = channel_list['data']['items']
- # items = []
- else:
- print('channel_list', channel_list)
- items = []
- keyChildOrder = ()
- for item in items: # 获取channel_id 后逐个拉取历史orders
- r = ()
- channel_id = item['id']
- channel = item['nickname']
- status = str(1)
- per_page = str(1000)
- limit_time = et
- get_time = st
- lt = parse.urlencode({'created_at[lt]': limit_time})
- gt = parse.urlencode({'created_at[gt]': get_time})
- url_1 = 'https://openapi.818tu.com/partners/channel/orders/list?'
- my_sign_1 = md5value(secert + 'channel_id=' + str(
- channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&per_page=' + per_page + '&status=' + status)
- parameter_1 = 'channel_id=' + str(
- channel_id) + '&' + gt + '&' + lt + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_1
- orders = requests.get(url_1 + parameter_1)
- t = orders.json()['data']['count'] // int(per_page) + 1
- for page in range(1, t + 1):
- my_sign_2 = md5value(secert + 'channel_id=' + str(
- channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&page=' + str(
- page) + '&per_page=' + per_page + '&status=' + status)
- parameter_2 = 'channel_id=' + str(channel_id) + '&' + gt + '&' + lt + '&page=' + str(
- page) + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_2
- orders_1 = requests.get(url_1 + parameter_2)
- b = orders_1.json()['data']['items']
- for a in b:
- c = {}
- c['user_id'] = str(a['member']['openid'])
- c['channel'] = channel
- c['reg_time'] = a['member']['created_at']
- c['channel_id'] = channel_id
- c['amount'] = round(a['price'] / 100, 2)
- c['order_id'] = str(a['id'])
- c['order_time'] = a['created_at']
- c['platform'] = '掌中云'
- c['stage'] = stage
- # c['amount']=a['amount']
- dtime = datetime.datetime.strptime(a['created_at'][0:10], "%Y-%m-%d")
- c['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
- if str(a['from_novel_id']) != 'None':
- c['from_novel'] = a['from_novel']['title']
- else:
- c['from_novel'] = 'None'
- """
- del a['member']
- del a['referral_link_id']
- del a['id']
- del a['created_at']
- del a['paid_at']
- del a['border_id']
- del a['from_novel_id']
- del a['status']
- del a['price']
- del a['agent_uid']
- """
- x = sorted(c.items(), key=lambda item: item[0])
- x = dict(x)
- x = tuple(x.values())
- r = r + ((x),)
- print('zzy_my_key_chanel', my_key, channel, len(r))
- if len(r) > 0:
- keyChildOrder = r + keyChildOrder
- print('zzy_my_key:', my_key, ' 下的订单数量为:', len(keyChildOrder))
- return keyChildOrder
- # 任务添加
- def url_list():
- task_list = []
- API_list = al.zzy_account_list
- url = 'https://openapi.818tu.com/partners/channel/channels/list?'
- for x in API_list:
- my_key = x[0]
- secert = x[1]
- stage = x[2]
- my_sign = md5value(secert + 'key=' + my_key)
- # todo 这里一定要手动编码,坑爹的
- my_key = parse.quote(x[0])
- parameter = 'key=' + my_key + '&sign=' + my_sign
- real_url = url + parameter
- full_url = real_url + '&a=' + my_key + '&b=' + secert + '&c=' + stage
- task_list.append(full_url)
- return task_list
- # 设定进、线程
- def get_pool(way=True, count=4):
- if way:
- # 进程
- pool = ProcessPoll(count)
- else:
- # 线程
- pool = ThreadPool(count)
- return pool
- # 启动
- def open_pool():
- pool = get_pool(way=False, count=16)
- task_q = url_list()
- results = pool.map(get_page, task_q) # 函数, 列表或元组
- pool.close()
- pool.join()
- totalCount = 0
- if len(results) > 0:
- for item in results:
- totalCount = totalCount + len(item)
- print('掌中云中订单数据:', totalCount)
- if __name__ == '__main__':
- start_second_time = date_util.getCurrentSecondTime()
- open_pool()
- print('执行时间:', date_util.getCurrentSecondTime() - start_second_time)
|