#!/usr/bin/env python # -*- coding: utf-8 -*- # 20201028 ## 20201027新添加 阳光 平台接口 ########################## 阳光接口返回数据格式 ''' { "error_code": 0, ## 错误码 0 成功 "error_msg": "", ## 错误信息 "data":{ ## 数据内容 "last_id": 1, ## 最后一条数据的 ID 请求下一页数据时进行传递 "count": 500, ## 此次查询数据量 "push_time": 1570701536, ## 返回时间戳 "list": [ ## 数据列表 { "merchant_id": "20180627151621_1_UdyF", ## 商户订单号 "transaction_id": "4200000122201806277014647642", ## 交易单号 "type": "1", ## 订单类型,'1'=>书币充值 '2'=>VIP 充值 "money": "50", ## 总额 "state": "1", ## 订单状态,'0'=>未完成 '1' => 完成 "from": "0", ## "create_time": "1530083789", ## 下单时间 "finish_time": "1530083789", ## 完成时间 "book_name": "一品邪少", ## 书名 "book_tags": "现代都市", ## "referral_url": "/t/392109", ## 推广链接 "user_id": "112333", ## 用户 id "channel_id": "1231" ## 渠道 id user_createtime ## 用户注册时间 openid ## 用户 openid }, ] ''' ########################## import random import datetime import hashlib import math import time from concurrent.futures import ProcessPoolExecutor from urllib import parse import requests from util import date_util from util import platform_config_util ## 账号配置 from util import robust_util from apscheduler.schedulers.blocking import BlockingScheduler from util.MySQLConnection import MySQLConnection def md5(s): md5 = hashlib.md5() md5.update(s.encode("utf-8")) return md5.hexdigest() def sha1(s): sha1 = hashlib.sha1() sha1.update(s.encode("utf-8")) return sha1.hexdigest() def get_random_str(num=5): H = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' salt = '' for i in range(num): salt += random.choice(H) return salt def getSelfDateStr(times=time.time(),date_format='%Y%m%d'): """ ## 20201028添加,阳光接口,日期参数请求格式20201028,一日一拉api数据 description: 获取指定时间戳 time: 秒 默认当前时间 return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str tips: 一天86400秒 """ timestamps = str(time.strftime(date_format,time.localtime(times))) return timestamps def get_yangguang_account_list(): """ des cription: 阳光账号列表 return: [['host_name', 'channel_id', 'secert_key', 'channel', 'stage']] ->list """ return platform_config_util.get_account_list('阳光', 'yangguang_account_config.csv') @robust_util.catch_exception def get_yangguang_order(st,et): total_order_list = () start_exec_seconds = date_util.getCurrentSecondTime() account_list = get_yangguang_account_list() executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: future = executor.submit(get_yangguang_order_task, st, et, account) futures.append(future) executor.shutdown(True) for future in futures: order_list = future.result() if len(order_list) > 0: total_order_list = order_list + total_order_list print('阳光订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list def get_yangguang_order_task(st,et,account): order_list = () url = 'https://api.yifengaf.cn:443/api/channeldataapi/orders' url_frequency = 1 ##接口鉴权参数 client_id = account[0] token = account[1] ##订单接口参数必填 channel_id = account[3] ##需要保存的参数 stage = account[2] channel = account[4] for i in range((et-st)//86400 + 1): last_id = 111 statis_unix_time = st + (i-1)*86400 search_date = getSelfDateStr(statis_unix_time,"%Y%m%d") while True: if st == et: break if url_frequency//30 == 0 : time.sleep(61) url_frequency += 1 nonce = get_random_str() timestamp = int(time.time()) signaure = str(token)+str(timestamp)+str(client_id)+str(nonce) signaure = sha1(signaure) params = { ## 授权url参数 'client_id': client_id, 'token': token, 'nonce': nonce, 'timestamp': timestamp, 'signaure': signaure, ## 订单url参数,那个渠道,那天的数据 'channel_id': channel_id, 'search_date': search_date, } response_result_json = requests.get(url=url, params=params).json() code = response_result_json['error_code'] if code != 0: # error_msg = response_result_json['error_msg'] # print("阳光查询充值接口错误信息:",error_msg) print("阳光异常vip公众号:",channel,"所在期数:",stage) print('阳光查询充值接口异常:',response_result_json,'传入参数', params) break result_data = response_result_json['data'] ''' # json['data']返回数据案例 {'last_id': 108847045, 'count': 4, 'push_time': 1604303106, # 'list': [{'user_id': '531307203', 'merchant_id': '20201101111026_531307203_lYzh', 'transaction_id': '', 'type': '1', 'money': '18.00', # 'state': '0', 'from': '0', 'create_time': '1604200226', 'finish_time': '', 'referral_id': '3515889', 'referral_id_permanent': '3515894', # 'channel_id': '26885', 'book_name': '猛虎出山', 'book_tags': '都市', 'referral_url': '/t/3515889', 'subscribe_time': '1603687515', # 'user_createtime': '1603687515', 'openid': 'om90f6EKBtwcNo9gcgISKsGNyk5o', 'register_ip': '', 'nickname': '爱拼才会赢(星辉电镀化工)', 'avatar': url连接 } ]} ''' page_count = result_data['count'] if page_count == 0: break if last_id == result_data['last_id']: break last_id = result_data['last_id'] params['last_id'] = result_data['last_id'] order_item_list = result_data['list'] for order_item in order_item_list: if order_item['state'] != '1': continue order = {} order['amount'] = order_item['money'] order['channel_id'] = order_item['channel_id'] order['order_id'] = str(order_item['transaction_id']) order['order_time'] = date_util.getSecondsToDatetime(int(order_item['create_time']),"%Y/%m/%d %H:%M:%S") ## 原数据时间戳,转日期默认=%Y-%m-%d %H:%M:%S order['user_id'] = order_item['user_id'] order['platform'] = '阳光' order['channel'] = channel order['reg_time'] = date_util.getSecondsToDatetime(int(order_item['user_createtime']),"%Y/%m/%d %H:%M:%S") ## 原数据时间戳,转日期默认=%Y-%m-%d %H:%M:%S order['from_novel'] = order_item['book_name'] ## str order['stage'] = stage ## str order['date'] = ( (int(order_item['create_time']) + 0*3600 ) // 86400) * 86400 ## 时间戳 x = sorted(order.items(), key=lambda item: item[0]) x = dict(x) x = tuple(x.values()) order_list = order_list + ((x),) print(f"数据日期-{search_date}-公众号-{channel}-获取数据-{len(order_list)}条,例如》 {order_list[0:1]}") return order_list def batch_save_order(data): if data is None or len(data) == 0: print('数据为空,不执行数据库操作!') else: sql = 'INSERT IGNORE INTO quchen_text.order_zwg(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' connect = MySQLConnection() try: num = connect.batch(sql, data) # 提交 connect.commit() print('订单数据最终入库【{num}】条'.format(num=num)) except Exception as e: print('订单数据入库失败:', e) finally: connect.close() def start_order_job(): start_exec_seconds = date_util.getCurrentSecondTime() st_unix, et_unix = date_util.getPreviousHourAndCurrentHourSecondTime(start_exec_seconds) # st_unix = 1602313200 # 2020/10/10 15:0:0 # et_unix = 1602316800 # 2020/10/10 16:0:0 #print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix)) #print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix)) ## 20201028添加阳光平台 batch_save_order(get_yangguang_order(st_unix, et_unix)) print('订单同步执行时间(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) start_order_job() ''' start_job_time = '2020-11-03 10:04:00' if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(start_order_job, 'interval', days =1 ,start_date=start_job_time) #scheduler.add_job(start_order_job, 'interval',days =1,hours = 2,minutes = 0,seconds = 0) #线上是24h执行一次 scheduler.start() '''