Python爬取视频(其实是一篇福利)

百科知识2025-04-272

窗外下着小雨,作为单身程序员的我逛着逛着发现一篇好东西,来自知乎 你都用 Python 来做什么?的第一个高亮答案。

到上面去看了看,地址都是明文的,得,赶紧开始吧。

下载流式文件,requests库中请求的stream设为True就可以啦,文档在此。

先找一个视频地址试验一下:

-- coding: utf-8 --

import requests

def download_file(url, path):
with (url, stream=True) as r:
chunk_size = 1024
content_size = int(['content-length'])
print '下载开始'
with open(path, "wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
(chunk)

if name == 'main':
url = '就在原帖...'
path = '想存哪都行'
download_file(url, path)

遭遇当头一棒:

AttributeError: exit

这文档也会骗人的么!

看样子是没有实现上下文需要的__exit__方法。既然只是为了保证要让r最后close以释放连接池,那就使用contextlib的closing特性好了:

-- coding: utf-8 --

import requests
from contextlib import closing

def download_file(url, path):
with closing((url, stream=True)) as r:
chunk_size = 1024
content_size = int(['content-length'])
print '下载开始'
with open(path, "wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
(chunk)

程序正常运行了,不过我盯着这文件,怎么大小不见变啊,到底是完成了多少了呢?还是要让下好的内容及时存进硬盘,还能省点内存是不是:

-- coding: utf-8 --

import requests
from contextlib import closing
import os

def download_file(url, path):
with closing((url, stream=True)) as r:
chunk_size = 1024
content_size = int(['content-length'])
print '下载开始'
with open(path, "wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
(chunk)
()
(())

文件以肉眼可见的速度在增大,真心疼我的硬盘,还是最后一次写入硬盘吧,程序中记个数就好了:

def download_file(url, path):
with closing((url, stream=True)) as r:
chunk_size = 1024
content_size = int(['content-length'])
print '下载开始'
with open(path, "wb") as f:
n = 1
for chunk in r.iter_content(chunk_size=chunk_size):
loaded = n*1024.0/content_size
(chunk)
print '已下载{0:%}'.format(loaded)
n += 1

结果就很直观了:

已下载2.579129%
已下载2.581255%
已下载2.583382%
已下载2.585508%

心怀远大理想的我怎么会只满足于这一个呢,写个类一起使用吧:

-- coding: utf-8 --

import requests
from contextlib import closing
import time

def download_file(url, path):
with closing((url, stream=True)) as r:
chunk_size = 1024*10
content_size = int(['content-length'])
print '下载开始'
with open(path, "wb") as f:
p = ProgressData(size = content_size, unit='Kb', block=chunk_size)
for chunk in r.iter_content(chunk_size=chunk_size):
(chunk)
()

class ProgressData(object):

def __init__(self, block,size, unit, file_name='', ):
    self.file_name = file_name
     = block/1000.0
     = size/1000.0
     = unit
     = 0
     = ()
def output(self):
     = ()
     += 1
    speed = /() if ()>0 else 0
     = ()
    loaded = *
    progress = round(loaded/, 4)
    if loaded >= :
        print u'%s下载完成\r\n'%self.file_name
    else:
        print u'{0}下载进度{1:.2f}{2}/{3:.2f}{4} 下载速度{5:.2%} {6:.2f}{7}/s'.\
              format(self.file_name, loaded, ,\
              , , progress, speed, )
        print '%50s'%('/'*int((1-progress)*50))

运行:

下载开始
下载进度10.24Kb/120174.05Kb 0.01% 下载速度4.75Kb/s
/
下载进度20.48Kb/120174.05Kb 0.02% 下载速度32.93Kb/s
/

看上去舒服多了。

下面要做的就是多线程同时下载了,主线程生产url放入队列,下载线程获取url:

-- coding: utf-8 --

import requests
from contextlib import closing
import time
import Queue
import hashlib
import threading
import os

def download_file(url, path):
with closing((url, stream=True)) as r:
chunk_size = 1024*10
content_size = int(['content-length'])
if (path) and (path)>=content_size:
print '已下载'
return
print '下载开始'
with open(path, "wb") as f:
p = ProgressData(size = content_size, unit='Kb', block=chunk_size, file_name=path)
for chunk in r.iter_content(chunk_size=chunk_size):
(chunk)
()

class ProgressData(object):

def __init__(self, block,size, unit, file_name='', ):
    self.file_name = file_name
     = block/1000.0
     = size/1000.0
     = unit
     = 0
     = ()
def output(self):
     = ()
     += 1
    speed = /() if ()>0 else 0
     = ()
    loaded = *
    progress = round(loaded/, 4)
    if loaded >= :
        print u'%s下载完成\r\n'%self.file_name
    else:
        print u'{0}下载进度{1:.2f}{2}/{3:.2f}{4} {5:.2%} 下载速度{6:.2f}{7}/s'.\
              format(self.file_name, loaded, ,\
              , , progress, speed, )
        print '%50s'%('/'*int((1-progress)*50))

queue = ()

def run():
while True:
url = (timeout=100)
if url is None:
print u'全下完啦'
break
h = hashlib.md5()
(url)
name = ()
path = 'e:/download/' + name + '.mp4'
download_file(url, path)

def get_url():
(None)

if name == 'main':
get_url()
for i in xrange(4):
t = (target=run)
= True
()

加了重复下载的判断,至于怎么源源不断的生产url,诸位摸索吧,保重身体!