python异步多线程超高性能爬虫爬取又拍云图片

前言

最近有个奇葩需求:
要把又拍云所有的图片全都下载下来 大约10T 3亿张左右
联系又拍云 能否邮寄硬盘直接拷贝,答曰不行。。但是给了个下载的python脚本
download_file_with_iter.py
我怀着感激的心情下载下来,结果一秒不到3张。。实在是慢,于是乎打算自己写一个

版本1

这是刚做的时候的版本,直接使用aio的队列,
环境:python3.6.2
所需包:

  1. pip install asyncio
  2. pip install aiohttp
  3. pip install aiofiles
import asyncio
import base64
import os
import urllib

import aiohttp
import aiofiles

# -----------------------
# -----------------------
bucket = 'bucket_name' #这里就是空间名称
username = 'username' #操作员账号
password = 'password' #操作员密码
#空间外联地址 因为又拍云的http下载没有频率限制,所以使用http下载 不适用restful的api接口下载
hostname = "http://xxxxx"  

# 这里是本地保存的根路径 这样下载后路径地址就跟空间内的地址是相对的了
base_save_path = 'f:'
# -----------------------

headers = {}
auth = base64.b64encode(f'{username}:{password}'.encode(encoding='utf-8'))
headers['Authorization'] = 'Basic ' + str(auth) #又拍云认证header头
headers['User-Agent'] = "UPYUN_DOWNLOAD_SCRIPT"
headers['x-list-limit'] = '300'

thread_sleep = 1


def is_dic(url):
    """判断key是否是目录 根据是否有后缀名判断"""
    # print(f'判断url:{url}')
    url = url.replace('http://v0.api.upyun.com/', '')
    if len(url.split('.')) == 1:
        return True
    else:
        return False


class Crawler:
    def __init__(self, init_key, hostname, max_tasks=10, pic_tsak=50):
        '''初始化爬虫'''
        self.loop = asyncio.get_event_loop()
        self.max_tries = 4  # 每个图片重试册数
        self.max_tasks = max_tasks  # 接口请求进程数
        self.key_queue = asyncio.Queue(loop=self.loop)  # 接口队列
        self.pic_queue = asyncio.Queue(loop=self.loop)  # 图片队列
        self.session = aiohttp.ClientSession(loop=self.loop)  # 接口异步http请求
        self.key_queue.put_nowait(
            {'key': init_key, 'x-list-iter': None, 'hostname': hostname})  # 初始化接口队列 push需要下载的目录
        self.pic_tsak = pic_tsak  # 图片下载队列

    def close(self):
        """回收http session"""
        self.session.close()

    async def work(self):
        """接口请求队列消费者"""
        try:
            while True:
                url = await self.key_queue.get()
                # print('key队列数量:' + await self.key_queue.qsize())
                await self.handle(url)
                self.key_queue.task_done()
                await asyncio.sleep(thread_sleep)
        except asyncio.CancelledError:
            pass

    async def work_pic(self):
        """图片请求队列消费者"""
        try:
            while True:
                url = await self.pic_queue.get()
                await self.handle_pic(url)
                self.pic_queue.task_done()
                await asyncio.sleep(thread_sleep)
        except asyncio.CancelledError:
            pass

    async def handle_pic(self, key):
        """处理图片请求"""
        url = (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
        url = url.encode('utf-8')
        url = urllib.parse.quote(url)

        pic_url = key['hostname'] + url + '!s400'

        tries = 0
        while tries < self.max_tries:
            try:
                print(f'请求图片:{pic_url}')
                async with self.session.get(pic_url, timeout=60) as response:
                    async with aiofiles.open(key['save_path'], 'wb') as f:
                        # print('保存文件:{}'.format(key['save_path']))
                        await f.write(await response.read())
                break
            except aiohttp.ClientError:
                pass
            tries += 1

    async def handle(self, key):

        """处理接口请求"""
        url = '/' + bucket + \
            (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
        url = url.encode('utf-8')
        url = urllib.parse.quote(url)

        if key['x-list-iter'] is not None:
            if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg':
                headers['X-List-Iter'] = key['x-list-iter']

        tries = 0
        while tries < self.max_tries:
            try:
                reque_url = "http://v0.api.upyun.com" + url
                print(f'请求接口:{reque_url}')
                async with self.session.get(reque_url, headers=headers, timeout=60) as response:
                    content = await response.text()
                    try:
                        iter_header = response.headers.get('x-upyun-list-iter')
                    except:
                        iter_header = 'g2gCZAAEbmV4dGQAA2VvZg'
                    list_json_param = content + "`" + \
                        str(response.status) + "`" + str(iter_header)
                    await self.do_file(self.get_list(list_json_param), key['key'], key['hostname'])
                break
            except aiohttp.ClientError:
                pass
            tries += 1

    def get_list(self, content):
        # print(content)
        if content:
            content = content.split("`")
            items = content[0].split('\n')
            content = [dict(zip(['name', 'type', 'size', 'time'], x.split('\t'))) for x in items] + content[1].split() + \
                content[2].split()
            return content
        else:
            return None

    async def do_file(self, list_json, key, hostname):
        """处理接口数据"""
        for i in list_json[:-2]:
            if not i['name']:
                continue
            new_key = key + i['name'] if key == '/' else key + '/' + i['name']
            try:
                if i['type'] == 'F':
                    self.key_queue.put_nowait(
                        {'key': new_key, 'x-list-iter': None, 'hostname': hostname})
                else:
                    try:
                        if not os.path.exists(bucket + key):
                            os.makedirs(bucket + key)
                    except OSError as e:
                        print('新建文件夹错误:' + str(e))
                    save_path = base_save_path + '/' + bucket + new_key
                    if not os.path.isfile(save_path):
                        self.pic_queue.put_nowait(
                            {'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname})
                    else:
                        print(f'文件已存在:{save_path}')
            except Exception as e:
                print('下载文件错误!:' + str(e))
                async with aiofiles.open('download_err.txt', 'a') as f:
                    await f.write(new_key + '\n')
        if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg':
            self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname})

    async def run(self):
        """初始化任务进程"""
        workers = [asyncio.Task(self.work(), loop=self.loop)
                   for _ in range(self.max_tasks)]

        workers_pic = [asyncio.Task(self.work_pic(), loop=self.loop)
                       for _ in range(self.pic_tsak)]

        await self.key_queue.join()
        await self.pic_queue.join()

        workers.append(workers_pic)
        for w in workers:
            w.cancel()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    crawler = Crawler('/', hostname, max_tasks=5, pic_tsak=150)
    loop.run_until_complete(crawler.run())

    crawler.close()

    loop.close()

上面的爬虫 ,速度可达到400张/S 但是 接下来的问题。。

版本2

本来上面的脚本已经爽的不行了。基本上已经可以跑满带宽了,但是有个新的问题。
用来放图片的硬盘单个只有4T 但是有一个又拍云的空间大小已经达到了7T 也就是说要两个同时下载,于是乎就用上了MQ

目录爬虫-爬取所有目录放到mq中:

import aiohttp
import asyncio
import urllib
import aiofiles
import asynqp
import os

base_save_path = 'f'
mq_host = '192.168.199.13'
mq_user = 'admin'
mq_password = '123123'

bucket = 'bucket_name'
hostname = "http://xxxxxx"

username = 'username'
password = 'password'

auth = base64.b64encode(f'{username}:{password}'.encode(encoding='utf-8'))

headers = {}
headers['Authorization'] = 'Basic ' + str(auth)
headers['User-Agent'] = "UPYUN_DOWNLOAD_SCRIPT"
headers['x-list-limit'] = '300'


class Spider:
    def __init__(self, max_task=10, max_tried=4):
        print(f'新建spider! 线程数:{max_task} 每次最多重试次数: {max_tried}')
        self.loop = asyncio.get_event_loop()
        self.max_tries = max_tried
        self.max_task = max_task
        self.session = aiohttp.ClientSession(loop=self.loop)

    def close(self):
        """回收http session"""
        self.session.close()

    async def download_work(self):
        try:
            while True:
                received_message = await self.queue.get()
                if received_message is None:
                    await asyncio.sleep(1)
                    continue
                msg_json = received_message.json()
                await self.handle(msg_json)
                received_message.ack()
                await asyncio.sleep(500) #爬太快了消费不了 加了很久的延迟
        except asyncio.CancelledError:
            pass

    async def handle(self, key):

        """处理接口请求"""
        url = '/' + key['bucket'] + \
            (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
        url = url.encode('utf-8')
        url = urllib.parse.quote(url)

        if key['x-list-iter'] is not None:
            if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg':
                headers['X-List-Iter'] = key['x-list-iter']

        tries = 0
        while tries < self.max_tries:
            try:
                reque_url = "http://v0.api.upyun.com" + url
                print(f'请求接口:{reque_url}')
                async with self.session.get(reque_url, headers=headers, timeout=60) as response:
                    content = await response.text()
                    try:
                        iter_header = response.headers.get('x-upyun-list-iter')
                    except:
                        iter_header = 'g2gCZAAEbmV4dGQAA2VvZg'
                    list_json_param = content + "`" + \
                        str(response.status) + "`" + str(iter_header)
                    await self.do_file(self.get_list(list_json_param), key['key'], key['hostname'], key['bucket'])
                break
            except aiohttp.ClientError:
                pass
            tries += 1

    def get_list(self, content):
        # print(content)
        if content:
            content = content.split("`")
            items = content[0].split('\n')
            content = [dict(zip(['name', 'type', 'size', 'time'], x.split('\t'))) for x in items] + content[1].split() + \
                content[2].split()
            return content
        else:
            return None

    async def do_file(self, list_json, key, hostname, bucket):
        """处理接口数据"""
        for i in list_json[:-2]:
            if not i['name']:
                continue
            new_key = key + i['name'] if key == '/' else key + '/' + i['name']
            try:
                if i['type'] == 'F':
                    await self.put_key_queue({'key': new_key, 'x-list-iter': None, 'hostname': hostname, 'bucket': bucket})
                    # self.key_queue.put_nowait(
                    #     {'key': new_key, 'x-list-iter': None, 'hostname': hostname, 'bucket': bucket})
                else:
                    save_path = '/' + bucket + new_key
                    if not os.path.isfile(base_save_path + save_path):
                        await self.put_pic_queue({'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname, 'bucket': bucket})
                    #else:
                    #    print(f'文件已存在:{base_save_path}{save_path}')
            except Exception as e:
                print('下载文件错误!:' + str(e))
                async with aiofiles.open('download_err.txt', 'a') as f:
                    await f.write(new_key + '\n')
        if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg':
            # self.key_queue.put_nowait(
            #     {'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname, 'bucket': bucket})
            await self.put_key_queue({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname, 'bucket': bucket})

    async def put_key_queue(self, obj):
        msg = asynqp.Message(obj)
        self.exchange.publish(msg, f'{bucket}.routing.key.key')

    async def put_pic_queue(self, obj):
        msg = asynqp.Message(obj)
        self.pic_exchange.publish(msg, 'routing.pic.key')

    async def run(self):
        self.connection = await asynqp.connect(host=mq_host, username=mq_user, password=mq_password)
        self.channel = await self.connection.open_channel()
        self.exchange = await self.channel.declare_exchange('key.exchange', 'direct')
        self.queue = await self.channel.declare_queue(f'{bucket}.key.queue', durable=True)
        await self.queue.bind(self.exchange, f'{bucket}.routing.key.key')

        self.channel_pic = await self.connection.open_channel()
        self.pic_exchange = await self.channel_pic.declare_exchange('pic.exchange', 'direct')
        self.pic_queue = await self.channel_pic.declare_queue('pic.queue', durable=True)
        await self.pic_queue.bind(self.pic_exchange, 'routing.pic.key')
        
        
        # 这里新的空间才需要爬取根目录
        # await self.put_key_queue({'key': '/', 'x-list-iter': None,'hostname': hostname, 'bucket': bucket}) 
        for _ in range(self.max_task):
            asyncio.ensure_future(self.download_work())
        await asyncio.sleep(2.0)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    spider = Spider(max_task=10)
    # asyncio.ensure_future(spider.run())
    loop.run_until_complete(spider.run())
    loop.run_forever()
    # print(f'Pending tasks at exit:{asyncio.Task.all_tasks(loop)}')
    spider.close()
    loop.close()
    

下面是下载服务,可以断点续传,多个机器同时下载:

import asyncio
import urllib
import os
import asynqp
import aiofiles
import aiohttp
import time


class Spider:
    def __init__(self, base_save_path, max_tried=4, mq_host='192.168.199.13', mq_user='admin', mq_password='123123'):
        self.loop = asyncio.get_event_loop()
        self.max_tries = max_tried
        self.session = aiohttp.ClientSession(loop=self.loop)

        self.mq_host = mq_host
        self.mq_user = mq_user
        self.mq_password = mq_password
        self.base_save_path = base_save_path

    def __del__(self):
        print('进程完成!,关闭aiohttp.session 和mq连接')
        self.session.close()

    async def download_work(self):
        print(f'创建mq连接')
        self.connection = await asynqp.connect(host=self.mq_host, username=self.mq_user, password=self.mq_password)
        self.channel = await self.connection.open_channel()
        self.exchange = await self.channel.declare_exchange('pic.exchange', 'direct')
        self.queue = await self.channel.declare_queue('pic.queue', durable=True)
        await self.queue.bind(self.exchange, 'routing.pic.key')
        print('连接成功!,建立队列通道')

        try:
            for _ in range(10000):
                try:
                    received_message = await self.queue.get()
                    if received_message is None:
                        await asyncio.sleep(1)
                        continue
                    msg_json = received_message.json()
                    await self.handle_pic(msg_json)
                except Exception:
                    async with aiofiles.open('download_error.txt', 'a') as f:
                        await f.write(msg_json['hostname'] + msg_json['key'] + '\n')
                finally:
                    received_message.ack()
                    await asyncio.sleep(0.01)
                    del received_message, msg_json  # 释放变量

        except asyncio.CancelledError:  # 进程退出
            pass

    async def handle_pic(self, key):
        """处理图片请求"""
        url = (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
        url = url.encode('utf-8')
        url = urllib.parse.quote(url)

        pic_url = key['hostname'] + url + '!s400'
        # del url  # 释放变量
        tries = 0
        while tries < self.max_tries:
            try:
                # print(f'请求图片:{pic_url}')
                async with self.session.get(pic_url, timeout=60) as response:
                    save_path = self.base_save_path + key['save_path']
                    dir_name = os.path.dirname(save_path)
                    # print(f'文件夹路径:{dir_name}')
                    try:
                        if not os.path.exists(dir_name):
                            os.makedirs(dir_name)
                    except OSError as e:
                        print('新建文件夹错误:' + str(e))
                    if os.path.isfile(save_path):
                        break

                    async with aiofiles.open(save_path, 'wb') as f:
                        print(f'保存文件:{save_path}')
                        await f.write(await response.read())
                        del save_path, dir_name
                break
            except aiohttp.ClientError:
                pass
            tries += 1

        del url, pic_url  # 释放变量

def restart_program():
    import sys
    import os
    python = sys.executable
    os.execl(python, python, * sys.argv)

def main():
    base_path = '/Users/luoda/Documents/project/pic_downloader'
    thread_count = 2

    loop = asyncio.get_event_loop()
    spider = Spider(base_path)
    works = []
    for _ in range(int(thread_count)):
        works.append(spider.download_work())
    loop.run_until_complete(asyncio.wait(works))
    print(f'麻痹的执行完了')
    time.sleep(20)
    restart_program()

if __name__ == '__main__':
    main()

效果

WX20170906-144202

WX20170906-144225@2x

WX20170906-144238@2x

WX20170906-144310@2x

Last modification:September 20th, 2017 at 02:56 pm
If you think my article is useful to you, please feel free to appreciate

Leave a Comment

13 comments

  1. Mrxn

    你的没有提示了 修好了 。厉害
    大佬环境用的 oneinstack 的额

    1. luodaoyi
      @Mrxn

      是啊 省事 哈哈哈

  2. Mrxn

    太牛气转凉 大佬喝茶! 呐 -_- |

  3. Mrxn

    貌似有个问题 你的 评论总是有提示 你设置一些php.ini 应该就好了:
    https://imgur.com/JCM5nFt

    1. luodaoyi
      @Mrxn

      不太会玩php 我知道了 感谢

  4. Mrxn

    佩服 佩服!

  5. 27

    6666,求vip

    1. luodaoyi
      @27

      求你妹。。正常图片。

  6. TomWangle

    不错,值得学习一下