
import requests
import time
import os
import json
import base64
import cv2
import numpy as np
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from concurrent.futures import ThreadPoolExecutor, wait
sourceDataDir = 'C:\Users\yuyang2\Pictures\jyy'
sourceDataNameList = sorted([pic for pic in os.listdir(sourceDataDir) if pic.endswith('.jpg')])
picFirstName = sourceDataNameList[0]
dstDataDir = 'C:\\Users\\yuyang2\\Pictures\\jyy_dataDst'
def cv2bytes(im):
'''cv2转二进制图片
:param im: cv2图像,numpy.ndarray
:return: 二进制图片数据,bytes
'''
return np.array(cv2.imencode('.png', im)[1]).tobytes()
def getImg_b(path):
with open(path, "rb") as fb: # 转为二进制格式
img_b = fb.read()
return img_b
def getStyle(input_list):
effectIdDict = {}
effectIdList = input_list
for effectInfo in effectIdList:
effectIdDict[effectInfo['effectId']] = effectInfo['effectName']
return effectIdDict
def checkDir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dstDataDir)
class AESUtil:
def __init__(self, key, iv):
self.pad_length = AES.block_size
self.key = key
self.iv = iv
def encryt(self, data):
"""
:param data: 二进制图片数据
:return: 经过aes加密和base64编码后的图像数据
"""
cipher = AES.new(self.key.encode('utf-8'), AES.MODE_CBC, self.iv.encode('utf-8'))
data = pad(data, 16, style='pkcs7')
# data = data.encode('utf-8')
msg = cipher.encrypt(data)
msg = base64.b64encode(msg)
return msg
def decrypt(self, enStr):
cipher = AES.new(self.key.encode('utf-8'), AES.MODE_CBC, self.iv.encode('utf-8'))
decryptByts = base64.b64decode(enStr)
msg = cipher.decrypt(decryptByts)
msg = msg.strip(b'\x0f')
return msg.decode('utf-8')
class BLing:
def __init__(self):
self.FOR_BETA = True
self.init_URL()
self.GROUP_body = None
self.MATERIAL_data = None
self.GAN_data = None
self.aes_key = "s***t@2021*"
self.aes_iv = "s***t@2021*"
self.headers = {
'User-Agent': 'GAN/1.0.0 (com.sensemars.gan; build:1; iOS 14.7.1) Alamofire/5.4.4',
'Content-Type': 'application/json; charset=utf-8',
'Accept-Encoding': 'br;q=1.0, gzip;q=0.9, deflate;q=0.8'
}
self.session = requests.session()
def init_URL(self):
if self.FOR_BETA:
self.GROUP_LIST_URL = "http://*/effect/manage/v1/api/getGroupList"
self.MATERIAL_LIST_URL = "http://*/effect/manage/v1/api/getEffectList"
self.GAN_URL = "http://*/algorithm/v1/sync/tasks"
else:
self.GROUP_LIST_URL = "https://api-bling.softsugar.com/effect/manage/v1/api/getGroupList"
self.MATERIAL_LIST_URL = "https://api-bling.softsugar.com/effect/manage/v1/api/getEffectList"
self.GAN_URL = "https://sf.softsugar.com:30380/algorithm/v1/sync/tasks"
def get_ase_img(self, img_b):
aes_base64 = AESUtil(key=self.aes_key, iv=self.aes_iv).encryt(img_b)
return aes_base64
def setGROUP_BodyData(self, page="", size=""):
self.GROUP_body = json.dumps({"page": page, "size": size})
def setMATERIA_BodyData(self, groupId="f939be78ccc04160a2a983c9f7506bb3", page="", size=""):
self.MATERIAL_data = json.dumps({"groupId": groupId, "page": page, "size": size})
def setGAN_BodyData(self, app_id, img_b):
self.GAN_data = json.dumps({"app_id": app_id, "requests": [{"data": self.get_ase_img(img_b).decode()}]})
def setUpBLingBodyData(self):
self.setGROUP_BodyData()
self.setMATERIA_BodyData()
def get_GROUP_LIST_URL(self):
response = self.session.post(self.GROUP_LIST_URL, headers=self.headers, data=self.GROUP_body)
return response.content.decode()
def get_MATERIAL_LIST_URL(self):
response = self.session.post(self.MATERIAL_LIST_URL, headers=self.headers, data=self.MATERIAL_data)
return response.json()
def get_GAN_URL(self):
response = self.session.post(self.GAN_URL, headers=self.headers, data=self.GAN_data)
return response.json()
# with open('data.json', 'r', encoding='utf-8') as f:
# json_data = json.load(f)
def get_BLing_data(pic_name):
try:
for effectId, style_name in effectIdDict.items():
print("正在处理图片:%s, style: %s" % (pic_name, style_name))
img_b = getImg_b(os.path.join(sourceDataDir, pic_name))
bLing.setGAN_BodyData(int(effectId), img_b)
response_GAN = bLing.get_GAN_URL()
print(response_GAN["results"][0]["code"])
img_data = response_GAN["results"][0]["data"]['image']
img_data = base64.b64decode(img_data)
save_name = os.path.join(dstDataDir, style_name + '_' + pic_name)
with open(save_name, 'wb') as f:
f.write(img_data)
except Exception as e:
print(e)
return
def test_BLing_Single():
# 开始访问算法接口,批量生成数据
for effectId, style_name in effectIdDict.items():
try:
for pic_name in sourceDataNameList:
print("正在处理图片:%s, style: %s" % (pic_name, style_name))
img_b = getImg_b(os.path.join(sourceDataDir, pic_name))
bLing.setGAN_BodyData(int(effectId), img_b)
response_GAN = bLing.get_GAN_URL()
print(response_GAN["results"][0]["code"])
img_data = response_GAN["results"][0]["data"]['image']
img_data = base64.b64decode(img_data)
save_name = os.path.join(dstDataDir, style_name + '_' + pic_name)
with open(save_name, 'wb') as f:
f.write(img_data)
except Exception as e:
print(e)
break
def test_BLing_Multi():
executor = ThreadPoolExecutor(4)
# 开始访问算法接口,批量生成数据
all_task = [executor.submit(get_BLing_data, pic_name,) for pic_name in sourceDataNameList]
wait(all_task)
# 参数times用来模拟网络请求的时间
def get_html(times):
time.sleep(3)
print("get page {}s finished".format(times))
urls = range(10) # 并不是真的url
def multiThreads():
executor = ThreadPoolExecutor(4)
all_task = [executor.submit(get_html, (url,)) for url in urls]
wait(all_task)
if __name__ == "__main__":
image_b = getImg_b(os.path.join(sourceDataDir, picFirstName)) # 获取图片二进制数据
checkDir(dstDataDir) # 检查文件夹是否存在
bLing = BLing()
bLing.setUpBLingBodyData() # 设置post data数据,初始化类
# response_GROUP_LIST = bLing.get_GROUP_LIST_URL()
# print(response_GROUP_LIST)
response_MATERIAL_LIST = bLing.get_MATERIAL_LIST_URL() # 通过相应数据,获取app_id 和 风格之间的映射关系
# 获取风格
effectIdDict = getStyle(response_MATERIAL_LIST['data']['records'])
print(effectIdDict)
# 单线程测试
test_BLing_Single()
# 多线程测试
# test_BLing_Multi()
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)