确保Python已安装依赖项:pandas和openpyxl
把需要下载的图片链接和修改后的名字放在Excel文件:Down.xlsx
pip install pandas openpyxl
把以下代码保存为download_images.py文件:
import pandas as pd
import os
import requests
from datetime import datetime
import urllib.parse
from pathlib import Path
import time
import random
from concurrent.futures import ThreadPoolExecutor, wait
from fake_useragent import UserAgent
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
# 设置日志
logging.basicConfig(
filename='download_errors.log',
level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 设置下载目录
base_dir = r"D:\Down"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
download_dir = os.path.join(base_dir, f"Task_{timestamp}")
os.makedirs(download_dir, exist_ok=True)
# 写死 Excel 文件路径
excel_file = r"C:\Users\XXX\Downloads\Down.xlsx"
# 初始化 User-Agent
ua = UserAgent()
# 可选代理设置(如果需要使用代理,取消注释并配置)
# proxies = {
# 'http': 'http://your_proxy:port',
# 'https': 'https://your_proxy:port'
# }
# 下载单张图片
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def download_image(row, index, session):
image_url = row['Image Src']
new_name = row['New Name']
if not image_url or not new_name:
return index, '', 'Missing URL or Name'
try:
# 获取文件扩展名
parsed_url = urllib.parse.urlparse(image_url)
ext = os.path.splitext(parsed_url.path)[1]
if not ext:
ext = '.jpg'
new_filename = f"{new_name}{ext}"
full_path = os.path.join(download_dir, new_filename)
# 设置请求头
headers = {'User-Agent': ua.random}
# 下载图片
response = session.get(image_url, timeout=10, headers=headers) # proxies=proxies if needed
# 检查响应状态
if response.status_code == 200:
with open(full_path, 'wb') as f:
f.write(response.content)
return index, full_path, 'Success'
else:
return index, '', f"HTTP {response.status_code}"
except requests.exceptions.RequestException as e:
logging.error(f"Failed to download {image_url}: {str(e)}")
return index, '', f"Error: {str(e)}"
except Exception as e:
logging.error(f"Unexpected error for {image_url}: {str(e)}")
return index, '', f"Error: {str(e)}"
# 主处理函数
def process_downloads():
# 读取 Excel 文件
try:
df = pd.read_excel(excel_file)
except FileNotFoundError:
print(f"Error: Excel file {excel_file} not found.")
return
# 确保所需列存在
required_columns = ['Image Src', 'New Name', 'Full Path', 'Status']
for col in required_columns:
if col not in df.columns:
df[col] = ''
# 使用 Session 保持连接
with requests.Session() as session:
# 使用线程池,最大10个并行任务
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [] # 明确使用列表
print(f"Initial type of futures: {type(futures)}") # 调试:初始类型
for index, row in df.iterrows():
# 调试:每次 append 前检查类型
if not isinstance(futures, list):
print(f"Error: futures is not a list, type is {type(futures)}")
return
futures.append(executor.submit(download_image, row, index, session))
print(f"Appended future for index {index}, futures length: {len(futures)}") # 调试
# 如果达到10个并行任务,等待任意一个完成
if len(futures) >= 10:
completed, uncompleted = wait(
futures, return_when='FIRST_COMPLETED'
)
futures = list(uncompleted) # 转换回列表
print(f"Completed {len(completed)} tasks, futures length: {len(futures)}") # 调试
# 随机延迟1-3秒
time.sleep(random.uniform(1, 3))
# 等待所有剩余任务完成
print("Waiting for remaining tasks...")
for future in wait(futures)[0]:
index, full_path, status = future.result()
df.at[index, 'Full Path'] = full_path
df.at[index, 'Status'] = status
# 保存更新后的 Excel
output_file = os.path.join(download_dir, "updated_excel.xlsx")
df.to_excel(output_file, index=False)
print(f"处理完成,更新后的 Excel 已保存至: {output_file}")
if __name__ == "__main__":
process_downloads()


