前言
最近有个奇葩需求:
要把又拍云所有的图片全都下载下来 大约10T 3亿张左右
联系又拍云 能否邮寄硬盘直接拷贝,答曰不行。。但是给了个下载的python脚本
download_file_with_iter.py
我怀着感激的心情下载下来,结果一秒不到3张。。实在是慢,于是乎打算自己写一个
版本1
这是刚做的时候的版本,直接使用aio的队列,
环境:python3.6.2
所需包:
pip install asyncio
pip install aiohttp
pip install aiofiles
import asyncio
import base64
import os
import urllib
import aiohttp
import aiofiles
# -----------------------
# -----------------------
bucket = "bucket_name" #这里就是空间名称
username = "username" #操作员账号
password = "password" #操作员密码
#空间外联地址 因为又拍云的http下载没有频率限制,所以使用http下载 不适用restful的api接口下载
hostname = "http://xxxxx"
# 这里是本地保存的根路径 这样下载后路径地址就跟空间内的地址是相对的了
base_save_path = "f:"
# -----------------------
headers = {}
auth = base64.b64encode(f"{username}:{password}".encode(encoding="utf-8"))
headers["Authorization"] = "Basic " + str(auth) #又拍云认证header头
headers["User-Agent"] = "UPYUN_DOWNLOAD_SCRIPT"
headers["x-list-limit"] = "300"
thread_sleep = 1
def is_dic(url):
"""判断key是否是目录 根据是否有后缀名判断"""
# print(f"判断url:{url}")
url = url.replace("http://v0.api.upyun.com/", "")
if len(url.split(".")) == 1:
return True
else:
return False
class Crawler:
def __init__(self, init_key, hostname, max_tasks=10, pic_tsak=50):
"""初始化爬虫"""
self.loop = asyncio.get_event_loop()
self.max_tries = 4 # 每个图片重试册数
self.max_tasks = max_tasks # 接口请求进程数
self.key_queue = asyncio.Queue(loop=self.loop) # 接口队列
self.pic_queue = asyncio.Queue(loop=self.loop) # 图片队列
self.session = aiohttp.ClientSession(loop=self.loop) # 接口异步http请求
self.key_queue.put_nowait(
{"key": init_key, "x-list-iter": None, "hostname": hostname}) # 初始化接口队列 push需要下载的目录
self.pic_tsak = pic_tsak # 图片下载队列
def close(self):
"""回收http session"""
self.session.close()
async def work(self):
"""接口请求队列消费者"""
try:
while True:
url = await self.key_queue.get()
# print("key队列数量:" + await self.key_queue.qsize())
await self.handle(url)
self.key_queue.task_done()
await asyncio.sleep(thread_sleep)
except asyncio.CancelledError:
pass
async def work_pic(self):
"""图片请求队列消费者"""
try:
while True:
url = await self.pic_queue.get()
await self.handle_pic(url)
self.pic_queue.task_done()
await asyncio.sleep(thread_sleep)
except asyncio.CancelledError:
pass
async def handle_pic(self, key):
"""处理图片请求"""
url = (lambda x: x[0] == "/" and x or "/" + x)(key["key"])
url = url.encode("utf-8")
url = urllib.parse.quote(url)
pic_url = key["hostname"] + url + "!s400"
tries = 0
while tries < self.max_tries:
try:
print(f"请求图片:{pic_url}")
async with self.session.get(pic_url, timeout=60) as response:
async with aiofiles.open(key["save_path"], "wb") as f:
# print("保存文件:{}".format(key["save_path"]))
await f.write(await response.read())
break
except aiohttp.ClientError:
pass
tries += 1
async def handle(self, key):
"""处理接口请求"""
url = "/" + bucket + \
(lambda x: x[0] == "/" and x or "/" + x)(key["key"])
url = url.encode("utf-8")
url = urllib.parse.quote(url)
if key["x-list-iter"] is not None:
if key["x-list-iter"] is not None or not "g2gCZAAEbmV4dGQAA2VvZg":
headers["X-List-Iter"] = key["x-list-iter"]
tries = 0
while tries < self.max_tries:
try:
reque_url = "http://v0.api.upyun.com" + url
print(f"请求接口:{reque_url}")
async with self.session.get(reque_url, headers=headers, timeout=60) as response:
content = await response.text()
try:
iter_header = response.headers.get("x-upyun-list-iter")
except:
iter_header = "g2gCZAAEbmV4dGQAA2VvZg"
list_json_param = content + "`" + \
str(response.status) + "`" + str(iter_header)
await self.do_file(self.get_list(list_json_param), key["key"], key["hostname"])
break
except aiohttp.ClientError:
pass
tries += 1
def get_list(self, content):
# print(content)
if content:
content = content.split("`")
items = content[0].split("\n")
content = [dict(zip(["name", "type", "size", "time"], x.split("\t"))) for x in items] + content[1].split() + \
content[2].split()
return content
else:
return None
async def do_file(self, list_json, key, hostname):
"""处理接口数据"""
for i in list_json[:-2]:
if not i["name"]:
continue
new_key = key + i["name"] if key == "/" else key + "/" + i["name"]
try:
if i["type"] == "F":
self.key_queue.put_nowait(
{"key": new_key, "x-list-iter": None, "hostname": hostname})
else:
try:
if not os.path.exists(bucket + key):
os.makedirs(bucket + key)
except OSError as e:
print("新建文件夹错误:" + str(e))
save_path = base_save_path + "/" + bucket + new_key
if not os.path.isfile(save_path):
self.pic_queue.put_nowait(
{"key": new_key, "save_path": save_path, "x-list-iter": None, "hostname": hostname})
else:
print(f"文件已存在:{save_path}")
except Exception as e:
print("下载文件错误!:" + str(e))
async with aiofiles.open("download_err.txt", "a") as f:
await f.write(new_key + "\n")
if list_json[-1] != "g2gCZAAEbmV4dGQAA2VvZg":
self.key_queue.put_nowait({"key": key, "x-list-iter": list_json[-1], "hostname": hostname})
async def run(self):
"""初始化任务进程"""
workers = [asyncio.Task(self.work(), loop=self.loop)
for _ in range(self.max_tasks)]
workers_pic = [asyncio.Task(self.work_pic(), loop=self.loop)
for _ in range(self.pic_tsak)]
await self.key_queue.join()
await self.pic_queue.join()
workers.append(workers_pic)
for w in workers:
w.cancel()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
crawler = Crawler("/", hostname, max_tasks=5, pic_tsak=150)
loop.run_until_complete(crawler.run())
crawler.close()
loop.close()
上面的爬虫 ,速度可达到400张/S 但是 接下来的问题。。
版本2
本来上面的脚本已经爽的不行了。基本上已经可以跑满带宽了,但是有个新的问题。
用来放图片的硬盘单个只有4T 但是有一个又拍云的空间大小已经达到了7T 也就是说要两个同时下载,于是乎就用上了MQ
目录爬虫-爬取所有目录放到mq中:
import aiohttp
import asyncio
import urllib
import aiofiles
import asynqp
import os
base_save_path = "f"
mq_host = "192.168.199.13"
mq_user = "admin"
mq_password = "123123"
bucket = "bucket_name"
hostname = "http://xxxxxx"
username = "username"
password = "password"
auth = base64.b64encode(f"{username}:{password}".encode(encoding="utf-8"))
headers = {}
headers["Authorization"] = "Basic " + str(auth)
headers["User-Agent"] = "UPYUN_DOWNLOAD_SCRIPT"
headers["x-list-limit"] = "300"
class Spider:
def __init__(self, max_task=10, max_tried=4):
print(f"新建spider! 线程数:{max_task} 每次最多重试次数: {max_tried}")
self.loop = asyncio.get_event_loop()
self.max_tries = max_tried
self.max_task = max_task
self.session = aiohttp.ClientSession(loop=self.loop)
def close(self):
"""回收http session"""
self.session.close()
async def download_work(self):
try:
while True:
received_message = await self.queue.get()
if received_message is None:
await asyncio.sleep(1)
continue
msg_json = received_message.json()
await self.handle(msg_json)
received_message.ack()
await asyncio.sleep(500) #爬太快了消费不了 加了很久的延迟
except asyncio.CancelledError:
pass
async def handle(self, key):
"""处理接口请求"""
url = "/" + key["bucket"] + \
(lambda x: x[0] == "/" and x or "/" + x)(key["key"])
url = url.encode("utf-8")
url = urllib.parse.quote(url)
if key["x-list-iter"] is not None:
if key["x-list-iter"] is not None or not "g2gCZAAEbmV4dGQAA2VvZg":
headers["X-List-Iter"] = key["x-list-iter"]
tries = 0
while tries < self.max_tries:
try:
reque_url = "http://v0.api.upyun.com" + url
print(f"请求接口:{reque_url}")
async with self.session.get(reque_url, headers=headers, timeout=60) as response:
content = await response.text()
try:
iter_header = response.headers.get("x-upyun-list-iter")
except:
iter_header = "g2gCZAAEbmV4dGQAA2VvZg"
list_json_param = content + "`" + \
str(response.status) + "`" + str(iter_header)
await self.do_file(self.get_list(list_json_param), key["key"], key["hostname"], key["bucket"])
break
except aiohttp.ClientError:
pass
tries += 1
def get_list(self, content):
# print(content)
if content:
content = content.split("`")
items = content[0].split("\n")
content = [dict(zip(["name", "type", "size", "time"], x.split("\t"))) for x in items] + content[1].split() + \
content[2].split()
return content
else:
return None
async def do_file(self, list_json, key, hostname, bucket):
"""处理接口数据"""
for i in list_json[:-2]:
if not i["name"]:
continue
new_key = key + i["name"] if key == "/" else key + "/" + i["name"]
try:
if i["type"] == "F":
await self.put_key_queue({"key": new_key, "x-list-iter": None, "hostname": hostname, "bucket": bucket})
# self.key_queue.put_nowait(
# {"key": new_key, "x-list-iter": None, "hostname": hostname, "bucket": bucket})
else:
save_path = "/" + bucket + new_key
if not os.path.isfile(base_save_path + save_path):
await self.put_pic_queue({"key": new_key, "save_path": save_path, "x-list-iter": None, "hostname": hostname, "bucket": bucket})
#else:
# print(f"文件已存在:{base_save_path}{save_path}")
except Exception as e:
print("下载文件错误!:" + str(e))
async with aiofiles.open("download_err.txt", "a") as f:
await f.write(new_key + "\n")
if list_json[-1] != "g2gCZAAEbmV4dGQAA2VvZg":
# self.key_queue.put_nowait(
# {"key": key, "x-list-iter": list_json[-1], "hostname": hostname, "bucket": bucket})
await self.put_key_queue({"key": key, "x-list-iter": list_json[-1], "hostname": hostname, "bucket": bucket})
async def put_key_queue(self, obj):
msg = asynqp.Message(obj)
self.exchange.publish(msg, f"{bucket}.routing.key.key")
async def put_pic_queue(self, obj):
msg = asynqp.Message(obj)
self.pic_exchange.publish(msg, "routing.pic.key")
async def run(self):
self.connection = await asynqp.connect(host=mq_host, username=mq_user, password=mq_password)
self.channel = await self.connection.open_channel()
self.exchange = await self.channel.declare_exchange("key.exchange", "direct")
self.queue = await self.channel.declare_queue(f"{bucket}.key.queue", durable=True)
await self.queue.bind(self.exchange, f"{bucket}.routing.key.key")
self.channel_pic = await self.connection.open_channel()
self.pic_exchange = await self.channel_pic.declare_exchange("pic.exchange", "direct")
self.pic_queue = await self.channel_pic.declare_queue("pic.queue", durable=True)
await self.pic_queue.bind(self.pic_exchange, "routing.pic.key")
# 这里新的空间才需要爬取根目录
# await self.put_key_queue({"key": "/", "x-list-iter": None,"hostname": hostname, "bucket": bucket})
for _ in range(self.max_task):
asyncio.ensure_future(self.download_work())
await asyncio.sleep(2.0)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
spider = Spider(max_task=10)
# asyncio.ensure_future(spider.run())
loop.run_until_complete(spider.run())
loop.run_forever()
# print(f"Pending tasks at exit:{asyncio.Task.all_tasks(loop)}")
spider.close()
loop.close()
下面是下载服务,可以断点续传,多个机器同时下载:
import asyncio
import urllib
import os
import asynqp
import aiofiles
import aiohttp
import time
class Spider:
def __init__(self, base_save_path, max_tried=4, mq_host="192.168.199.13", mq_user="admin", mq_password="123123"):
self.loop = asyncio.get_event_loop()
self.max_tries = max_tried
self.session = aiohttp.ClientSession(loop=self.loop)
self.mq_host = mq_host
self.mq_user = mq_user
self.mq_password = mq_password
self.base_save_path = base_save_path
def __del__(self):
print("进程完成!,关闭aiohttp.session 和mq连接")
self.session.close()
async def download_work(self):
print(f"创建mq连接")
self.connection = await asynqp.connect(host=self.mq_host, username=self.mq_user, password=self.mq_password)
self.channel = await self.connection.open_channel()
self.exchange = await self.channel.declare_exchange("pic.exchange", "direct")
self.queue = await self.channel.declare_queue("pic.queue", durable=True)
await self.queue.bind(self.exchange, "routing.pic.key")
print("连接成功!,建立队列通道")
try:
for _ in range(10000):
try:
received_message = await self.queue.get()
if received_message is None:
await asyncio.sleep(1)
continue
msg_json = received_message.json()
await self.handle_pic(msg_json)
except Exception:
async with aiofiles.open("download_error.txt", "a") as f:
await f.write(msg_json["hostname"] + msg_json["key"] + "\n")
finally:
received_message.ack()
await asyncio.sleep(0.01)
del received_message, msg_json # 释放变量
except asyncio.CancelledError: # 进程退出
pass
async def handle_pic(self, key):
"""处理图片请求"""
url = (lambda x: x[0] == "/" and x or "/" + x)(key["key"])
url = url.encode("utf-8")
url = urllib.parse.quote(url)
pic_url = key["hostname"] + url + "!s400"
# del url # 释放变量
tries = 0
while tries < self.max_tries:
try:
# print(f"请求图片:{pic_url}")
async with self.session.get(pic_url, timeout=60) as response:
save_path = self.base_save_path + key["save_path"]
dir_name = os.path.dirname(save_path)
# print(f"文件夹路径:{dir_name}")
try:
if not os.path.exists(dir_name):
os.makedirs(dir_name)
except OSError as e:
print("新建文件夹错误:" + str(e))
if os.path.isfile(save_path):
break
async with aiofiles.open(save_path, "wb") as f:
print(f"保存文件:{save_path}")
await f.write(await response.read())
del save_path, dir_name
break
except aiohttp.ClientError:
pass
tries += 1
del url, pic_url # 释放变量
def restart_program():
import sys
import os
python = sys.executable
os.execl(python, python, * sys.argv)
def main():
base_path = "/Users/luoda/Documents/project/pic_downloader"
thread_count = 2
loop = asyncio.get_event_loop()
spider = Spider(base_path)
works = []
for _ in range(int(thread_count)):
works.append(spider.download_work())
loop.run_until_complete(asyncio.wait(works))
print(f"麻痹的执行完了")
time.sleep(20)
restart_program()
if __name__ == "__main__":
main()
效果
Q.E.D.