可复用可组合。
概述
在互联网上浏览时,遇到好看的图片或视频,总想下载保存起来。本文讲述使用Python实现批量下载网络图片或视频的方法。
批量下载网络图片或视频,主要有四步:
- 生成多个 URL:因为可能需要从多个URL获取图片或视频资源;
- 抓取网页内容:为析取图片视频资源作准备;
- 解析URL内容:析取图片视频资源;
- 下载图片和视频:可调用下载工具。
我们对程序的目标依然是:
- 可复用:尽量写一次,多处使用。
- 可组合:能够组合出不同功能的工具。
要做到可复用,就需要尽量把通用的部分抽离出来,做成选项; 要做到可组合,就需要把功能合理地拆分成多个子任务,用不同程序去实现这些子任务。
生成多个URL
多个URL往往是在一个基础URL经过某种变形得到。可以用含有占位符 :p 的模板 t 和参数组成。
gu.py- from typing import List
- from urllib.parse import urlparse
- import argparse
- def generate_urls(base_url: str, m: int, n: int) -> List[str]:
- """
- Generate a series of URLs based on a base URL and transformation rules.
-
- Args:
- base_url (str): The base URL to transform
- m (int): Start number
- n (int): End number (inclusive)
-
- Returns:
- List[str]: List of generated URLs
-
- Examples:
- >>> generate_urls("https://example.com/xxx:pyyy", 1, 3)
- ['https://example.com/xxx1yyy', 'https://example.com/xxx2yyy', 'https://example.com/xxx3yyy']
- >>> generate_urls("https://example.com/page_:p.php", 1, 3)
- ['https://example.com/page_1.php', 'https://example.com/page_2.php', 'https://example.com/page_3.php']
- """
- if not base_url or not isinstance(m, int) or not isinstance(n, int):
- raise ValueError("Invalid input parameters")
-
- if m > n:
- raise ValueError("Start number (m) must be less than or equal to end number (n)")
-
- # Parse the URL to validate it
- parsed_url = urlparse(base_url)
- if not parsed_url.scheme and not base_url.startswith('//'):
- raise ValueError("Invalid URL format")
-
- # Handle the $p pattern
- if ":p" in base_url:
- parts = base_url.split(":p")
- if len(parts) != 2:
- raise ValueError("Invalid URL pattern: should contain exactly one $p")
- prefix, suffix = parts
- return [f"{prefix}{i}{suffix}" for i in range(m, n + 1)]
-
- raise ValueError("URL pattern not supported. Use $p as placeholder for numbers")
- def parse_range(range_str: str) -> tuple[int, int]:
- """
- Parse a range string like "1-3" into start and end numbers.
-
- Args:
- range_str (str): Range string (e.g., "1-3")
-
- Returns:
- tuple[int, int]: Start and end numbers
- """
- try:
- start, end = map(int, range_str.split("-"))
- return start, end
- except ValueError:
- raise ValueError("Invalid range format. Use 'start-end' (e.g., '1-3')")
- def parse_list(list_str: str) -> List[str]:
- """
- Parse a comma-separated string into a list of values.
-
- Args:
- list_str (str): Comma-separated string (e.g., "1,2,3")
-
- Returns:
- List[str]: List of values
- """
- return [item.strip() for item in list_str.split(",")]
- def main():
- parser = argparse.ArgumentParser(description='Generate a series of URLs based on a pattern')
- parser.add_argument('-u', '--url', required=True, help='Base URL with {p} as placeholder')
-
- # Add mutually exclusive group for range or list
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument('-r', '--range', help='Range of numbers (e.g., "1-3")')
- group.add_argument('-l', '--list', help='Comma-separated list of values (e.g., "1,2,3")')
-
- args = parser.parse_args()
-
- try:
- if args.range:
- start, end = parse_range(args.range)
- urls = generate_urls(args.url, start, end)
- elif args.list:
- values = parse_list(args.list)
- template = args.url.replace(":p", "{}")
- urls = [template.format(value) for value in values]
-
- for url in urls:
- print(url)
- except ValueError as e:
- print(f"Error: {e}")
- exit(1)
- if __name__ == "__main__":
- main()
复制代码 使用方法:- gu -u "https://www.yituyu.com/gallery/:p/index.html" -l "10234,10140"
复制代码 就可以生成 https://www.yituyu.com/gallery/10234/index.html, https://www.yituyu.com/gallery/10140/index.html
或者使用- gu -u "https://www.yituyu.com/gallery/:p/index.html" -r 1-3
复制代码 生成: https://www.yituyu.com/gallery/1/index.html,https://www.yituyu.com/gallery/2/index.html,https://www.yituyu.com/gallery/3/index.html
抓取网页内容
web.py
这里使用了 requests 和 chromedriver 。静态网页可以直接用 requests,动态网页需要用 chromedriver 模拟打开网页。有些网页还需要滚动到最下面加载资源。- import requests
- import time
- from pytools.common.common import catchExc
- from pytools.con.multitasks import IoTaskThreadPool
- from selenium import webdriver
- from selenium.webdriver.chrome.options import Options
- from selenium.webdriver.common.by import By
- delayForHttpReq = 0.5 # 500ms
- class HTMLGrasper(object):
- def __init__(self, conf):
- '''
- 抓取 HTML 网页内容时的配置项
- _async: 是否异步加载网页。 _async = 1 当网页内容是动态生成时,异步加载网页;
- targetIdWhenAsync: 当 _async = 1 指定。
- 由于此时会加载到很多噪音内容,需要指定 ID 来精确获取所需的内容部分
- sleepWhenAsync: 当 _async = 1 指定。
- 异步加载网页时需要等待的秒数
- '''
- self._async = conf.get('async', 0)
- self.targetIdWhenAsync = conf.get('targetIdWhenAsync', '')
- self.sleepWhenAsync = conf.get('sleepWhenAsync', 10)
- def batchGrapHtmlContents(self, urls):
- '''
- batch get the html contents of urls
- '''
- grapHtmlPool = IoTaskThreadPool(20)
- return grapHtmlPool.exec(self.getHTMLContent, urls)
- def getHTMLContent(self, url):
- if self._async == 1:
- htmlContent = self.getHTMLContentAsync(url)
- if htmlContent is not None and htmlContent != '':
- html = '<html><head></head><body>' + htmlContent + '</body></html>'
- return html
- return self.getHTMLContentFromUrl(url)
- def getHTMLContentAsync(self, url):
- '''
- get html content from dynamic loaed html url
- '''
- chrome_options = Options()
- chrome_options.add_argument('--headless')
- chrome_options.add_argument('--disable-gpu')
- driver = webdriver.Chrome(chrome_options=chrome_options)
- driver.get(url)
- time.sleep(self.sleepWhenAsync)
-
- # 模拟滚动到底部多次以确保加载所有内容
- last_height = driver.execute_script("return document.body.scrollHeight")
- for _ in range(3): # 最多滚动3次
- # 滚动到底部
- driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
- # 等待加载
- time.sleep(2)
- # 计算新的滚动高度并与上一次比较
- new_height = driver.execute_script("return document.body.scrollHeight")
- if new_height == last_height:
- break
- last_height = new_height
- try:
- elem = driver.find_element_by_id(self.targetIdWhenAsync)
- except:
- elem = driver.find_element(By.XPATH, '/html/body')
- return elem.get_attribute('innerHTML')
- def getHTMLContentFromUrl(self, url):
- '''
- get html content from html url
- '''
- r = requests.get(url)
- status = r.status_code
- if status != 200:
- return ''
- return r.text
- '''
- # 利用property装饰器将获取name方法转换为获取对象的属性
- @property
- def async(self):
- return self._async
- # 利用property装饰器将设置name方法转换为获取对象的属性
- @async.setter
- def async(self,async):
- self._async = async
- '''
复制代码
析取图片或视频资源
res.py
这里使用了 BeautifulSoup. 网页文件通常是 HTML。因此,需要写一个程序,从 HTML 内容中解析出图片或资源地址。现代web页面通常采用 DIV+CSS+JS 框架。 图片或视频资源,通常是 a, img, video 之类的标签,或者 class 或 id 指定的元素 。再定位到元素的属性,比如 href, src 等。此处需要一点 HTML 和 CSS 知识,还有 jQuery 定位元素的知识。- #!/usr/bin/python3
- #_*_encoding:utf-8_*_
- import re
- import sys
- import json
- import argparse
- from bs4 import BeautifulSoup
- from pytools.net.web import HTMLGrasper
- SaveResLinksFile = '/Users/qinshu/joy/reslinks.txt'
- serverDomain = ''
- def parseArgs():
- description = '''This program is used to batch fetch url resources from specified urls.
- eg. python3 res.py -u http://xxx.html -r 'img=jpg,png;class=resLink;id=xyz'
- will search resource links from network urls http://xxx.html by specified rules
- img = jpg or png OR class = resLink OR id = xyz [ multiple rules ]
- python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167480.html' -r 'img=jpg!c'
- for <img src="https://www.cnblogs.com/xxx.jpg!c"/>
- '''
- parser = argparse.ArgumentParser(description=description)
- parser.add_argument('-u','--url', nargs='+', help='At least one html urls are required', required=True)
- parser.add_argument('-r','--rulepath', nargs=1, help='rules to search resources. if not given, search a hrefs or img resources in given urls', required=False)
- parser.add_argument('-o','--output', nargs=1, help='Specify the output file to save the links', required=False)
- parser.add_argument('-a','--attribute', nargs=1, help='Extract specified attribute values from matched elements', required=False)
- args = parser.parse_args()
- init_urls = args.url
- rulepath = args.rulepath
- output_file = args.output[0] if args.output else SaveResLinksFile
- return (init_urls, rulepath, output_file, args.attribute[0] if hasattr(args, 'attribute') and args.attribute else None)
- def getAbsLink(serverDomain, link):
- try:
- link_content = link
- if link_content.startswith('//'):
- link_content = 'https:' + link_content
- if link_content.startswith('/'):
- link_content = serverDomain + link_content
- return link_content
- except:
- return ''
- def batchGetResTrueLink(resLinks):
- return filter(lambda x: x != '', resLinks)
- resTags = set(['img', 'video', 'a', 'div'])
- def findWantedLinks(htmlcontent, rule, attribute):
- '''
- find html links or res links from html by rule.
- sub rules such as:
- (1) a link with id=[value1,value2,...]
- (2) a link with class=[value1,value2,...]
- (3) res with src=xxx.jpg|png|mp4|...
- a rule is map containing sub rule such as:
- { 'id': [id1, id2, ..., idn] } or
- { 'class': [c1, c2, ..., cn] } or
- { 'img': ['jpg', 'png', ... ]} or
- { 'video': ['mp4', ...]}
- '''
- #print("html===\n"+htmlcontent+"\n===End")
- #print("rule===\n"+str(rule)+"\n===End")
- soup = BeautifulSoup(htmlcontent, "lxml")
- all_links = []
- attribute_links = []
- for (key, values) in rule.items():
- if key == 'id':
- for id in values:
- link_soups = soup.find_all('a', id=id)
- elif key == 'class':
- for cls in values:
- link_soups = findLinkSoups(soup, ['a', 'img', 'div'], cls)
- elif key in resTags:
- link_soups = []
- for resSuffix in values:
- if resSuffix != "":
- link_soups.extend(soup.find_all(key, src=re.compile(resSuffix)))
- else:
- link_soups = soup.find_all(key)
- attribute_links.extend([link.get(attribute) for link in link_soups if link.get(attribute)])
- all_links.extend(attribute_links)
- return all_links
- def findLinkSoups(soup, tags, cls):
- all_link_soups = []
- if len(tags) == 0:
- all_link_soups.extend(soup.find_all("a", class_=cls))
- else:
- for tag in tags:
- if cls != "":
- link_soups = soup.find_all(tag, class_=cls)
- else:
- link_soups = soup.find_all(tag)
- all_link_soups.extend(link_soups)
- return all_link_soups
- def batchGetLinks(urls, rules, output_file, attribute=None):
- conf = {"async":1, "targetIdWhenAsync": "page-fav", "sleepWhenAsync": 10}
- grasper = HTMLGrasper(conf)
- htmlcontentList = grasper.batchGrapHtmlContents(urls)
- allLinks = []
- for htmlcontent in htmlcontentList:
- for rule in rules:
- links = findWantedLinks(htmlcontent, rule, attribute)
- allLinks.extend(links)
-
- with open(output_file, 'w') as f:
- for link in allLinks:
- print(link)
- f.write(link + "\n")
- def parseRulesParam(rulesParam):
- '''
- parse rules params to rules json
- eg. img=jpg,png;class=resLink;id=xyz to
- [{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}]
- '''
- defaultRules = [{'img': ['jpg','png','jpeg']},{"class":"*"}]
- if rulesParam:
- try:
- rules = []
- rulesStrArr = rulesParam[0].split(";")
- for ruleStr in rulesStrArr:
- ruleArr = ruleStr.split("=")
- key = ruleArr[0]
- value = ruleArr[1].split(",")
- rules.append({key: value})
- return rules
- except ValueError as e:
- print('Param Error: invalid rulepath %s %s' % (rulepathjson, e))
- sys.exit(1)
- return defaultRules
- def testBatchGetLinks():
- urls = ['http://dp.pconline.com.cn/list/all_t145.html']
- rules = [{"img":["jpg"], "video":["mp4"]}]
- batchGetLinks(urls, rules, SaveResLinksFile)
- if __name__ == '__main__':
- #testBatchGetLinks()
- (init_urls, rulesParam, output_file, attribute) = parseArgs()
- if not output_file:
- output_file = SaveResLinksFile
- # print('init urls: %s' % "\n".join(init_urls))
- rulepath = parseRulesParam(rulesParam)
- # print('rulepath: %s\n' % (rulepath))
- batchGetLinks(init_urls, rulepath, output_file, attribute)
复制代码 怎么找到对应的资源地址?右键-控制台,鼠标点击最左边那个箭头指向的小方框,然后在点击网页元素,就会定位到网页元素,下面图片资源地址就是 img 的 data-src 属性 或者 src 属性。 不过这里 src 属性是需要滚动到最后才能展示所有的,但 data-src 是直接加载的。如果要省时,就可以用 data-src 属性。好处是快,不足是不通用。视频地址类似。
将图片资源URL下载到 ~/Downloads/links5.txt,整个命令是:- gu -u "https://www.yituyu.com/gallery/:p/index.html" -l "9174,9170" | xargs -I {} python3 ~/tools/pytools/pytools/tools/res.py -u {} -r "class=lazy" -a "data-src" -o ~/Downloads/links5.txt
复制代码 下载图片或视频
dw.py- #!/usr/bin/env python3
- import subprocess
- import shlex
- from pathlib import Path
- from typing import Optional, Union, List
- import time
- import requests
- import argparse
- default_save_path = "/Users/qinshu/Downloads"
- def download(url: str, output_dir: Union[str, Path]) -> Optional[Path]:
- output_dir = Path(output_dir)
- if url.endswith(".jpg") or url.endswith(".png"):
- download_image(url, output_dir / Path(url).name)
- else:
- download_video(url, output_dir)
- return output_dir / Path(url).name
- return None
- def download_image(url: str, output_file: Union[str, Path]) -> None:
- try:
- # 发送 HTTP GET 请求获取图片
- response = requests.get(url, stream=True)
- response.raise_for_status() # 检查请求是否成功
- # 以二进制写入模式保存图片
- with open(output_file, 'wb') as f:
- for chunk in response.iter_content(1024):
- f.write(chunk)
-
- print(f"图片已保存至: {output_file}")
- except requests.exceptions.RequestException as e:
- print(f"下载失败: {e}")
- except Exception as e:
- print(f"发生错误: {e}")
- def download_video(
- video_url: str,
- output_dir: Union[str, Path] = Path.cwd(),
- timeout: int = 3600, # 1小时超时
- retries: int = 1,
- verbose: bool = True
- ) -> Optional[Path]:
- """
- 使用 y 命令下载视频
-
- 参数:
- video_url: 视频URL (e.g. "https://www.bilibili.com/video/BV1xx411x7xx")
- output_dir: 输出目录 (默认当前目录)
- timeout: 超时时间(秒)
- retries: 重试次数
- verbose: 显示下载进度
-
- 返回:
- 成功时返回下载的视频路径,失败返回None
- """
- if video_url == "":
- return None
- output_dir = Path(output_dir)
- output_dir.mkdir(parents=True, exist_ok=True)
-
- cmd = f"y {shlex.quote(video_url)}"
- if verbose:
- print(f"开始下载: {video_url}")
- print(f"保存到: {output_dir.resolve()}")
- print(f"执行命令: {cmd}")
-
- for attempt in range(1, retries + 1):
- try:
- start_time = time.time()
-
- # 使用Popen实现实时输出
- process = subprocess.Popen(
- cmd,
- shell=True,
- cwd=str(output_dir),
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- universal_newlines=True,
- bufsize=1
- )
-
- # 实时打印输出
- while True:
- output = process.stdout.readline()
- if output == '' and process.poll() is not None:
- break
- if output and verbose:
- print(output.strip())
-
- # 检查超时
- if time.time() - start_time > timeout:
- process.terminate()
- raise subprocess.TimeoutExpired(cmd, timeout)
-
- # 检查返回码
- if process.returncode == 0:
- if verbose:
- print(f"下载成功 (尝试 {attempt}/{retries})")
- return _find_downloaded_file(output_dir, video_url)
- else:
- raise subprocess.CalledProcessError(process.returncode, cmd)
-
- except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e:
- if attempt < retries:
- wait_time = min(attempt * 10, 60) # 指数退避
- if verbose:
- print(f"尝试 {attempt}/{retries} 失败,{wait_time}秒后重试...")
- print(f"错误: {str(e)}")
- time.sleep(wait_time)
- else:
- if verbose:
- print(f"下载失败: {str(e)}")
- return None
- def _find_downloaded_file(directory: Path, video_url: str) -> Optional[Path]:
- """尝试自动查找下载的文件"""
- # 这里可以根据实际y命令的输出文件名模式进行调整
- # 示例:查找最近修改的视频文件
- video_files = sorted(
- directory.glob("*.mp4"),
- key=lambda f: f.stat().st_mtime,
- reverse=True
- )
- return video_files[0] if video_files else None
- def read_urls_from_file(file_path: Union[str, Path]) -> List[str]:
- with open(file_path, 'r') as f:
- return [line.strip() for line in f if line.strip()]
- def main():
- parser = argparse.ArgumentParser(description="下载工具:支持从URL或文件下载视频和图片")
- parser.add_argument("-u", "--url", help="单个下载URL")
- parser.add_argument("-f", "--file", help="包含多个URL的文件路径(每行一个URL)")
- parser.add_argument("-o", "--output", default=".", help="输出目录路径(默认为当前目录)")
- args = parser.parse_args()
- if not args.url and not args.file:
- parser.error("必须提供 -u 或 -f 参数")
- if not args.output:
- output_dir = default_save_path
- else:
- output_dir = Path(args.output)
- output_dir.mkdir(parents=True, exist_ok=True)
- urls = []
- if args.url:
- urls.append(args.url)
- if args.file:
- urls.extend(read_urls_from_file(args.file))
- for url in urls:
- print(f"处理URL: {url}")
- result = download(url, output_dir)
- if result:
- print(f"下载完成: {result}")
- else:
- print(f"下载失败: {url}")
- if __name__ == "__main__":
- main()
复制代码 注册工具
要想运行工具,就需要 python /path/to/python_file.py ,每次写全路径挺麻烦的。可以写一个shell 脚本,将写的 python 工具注册到 ~/.zshrc ,然后每次 source ~/.zshrc 即可。- #!/bin/bash
- # Get the absolute path of the tools directory
- TOOLS_DIR="/Users/qinshu/tools/pytools/pytools/tools"
- # Add a comment to mark the beginning of our tools section
- echo -e "\n# === Python Tools Aliases ===" >> ~/.zshrc
- # Loop through all Python files in the tools directory
- for file in "$TOOLS_DIR"/*.py; do
- if [ -f "$file" ]; then
- # Get just the filename without extension
- filename=$(basename "$file" .py)
-
- # Skip .DS_Store and any other hidden files
- if [[ $filename != .* ]]; then
- # Create the alias command
- alias_cmd="alias $filename="python3 $file""
-
- # Check if this alias already exists in .zshrc
- if ! grep -q "alias $filename=" ~/.zshrc; then
- echo "$alias_cmd" >> ~/.zshrc
- echo "Added alias for: $filename"
- else
- echo "Alias already exists for: $filename"
- fi
- fi
- fi
- done
- echo "All Python tools have been registered in ~/.zshrc"
- echo "Please run 'source ~/.zshrc' to apply the changes"
复制代码 这里实际上就是生成了一系列 alias:- alias gu="python3 /Users/qinshu/tools/pytools/pytools/tools/gu.py"
- alias res="python3 /Users/qinshu/tools/pytools/pytools/tools/res.py"
- alias dw="python3 /Users/qinshu/tools/pytools/pytools/tools/dw.py"
复制代码 这样就可以直接用- gu -u 'https://xxx' -l 1-3
复制代码 从该网站批量下载图片的整个命令合起来是:- gu -u "https://www.yituyu.com/gallery/:p/index.html" -l "9174,9170" | xargs -I {} python3 ~/tools/pytools/pytools/tools/res.py -u {} -r "class=lazy" -a "data-src" -o ~/Downloads/links5.txt && dw -f -o ~/Downloads/links5.txt
复制代码 虽然繁琐了一点,但是胜在通用。
小结
本文讲解了批量下载网络图片或视频的方法,包括四个主要步骤:生成多个URL、抓取网页内容、析取资源地址、下载资源。每个步骤既独立又承上启下,因此做到了可组合。要做到通用,需要掌握一些基本编程知识。当你了解其中一些原理时,就能获得更强的能力,而不仅仅局限于 GUI。
来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除 |