3/PY/py_4kvm.py
2025-04-09 16:12:50 +08:00

1121 lines
51 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 4k影视爬虫
import sys
import json
import re
import time
import urllib.parse
import requests
from bs4 import BeautifulSoup
import datetime
import urllib.request
# 导入基础类
sys.path.append('../../')
try:
from base.spider import Spider
except ImportError:
# 本地调试时的替代实现
class Spider:
def init(self, extend=""):
pass
class Spider(Spider):
def __init__(self):
# 网站主URL
self.siteUrl = "https://www.4kvm.net"
# 分类ID映射
self.cateManual = {
# "首页": "",
# "电影": "movies",
# "电视剧": "tvshows",
# "高分电影": "imdb",
# "热门播放": "trending",
}
# 请求头
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://www.4kvm.net/",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}
def getName(self):
return "4K影视"
def init(self, extend=""):
# 初始化方法,可以留空
return
def isVideoFormat(self, url):
"""判断是否为视频格式"""
if not url:
return False
# 检查URL是否以视频格式结尾
video_extensions = ['.mp4', '.m3u8', '.ts', '.flv', '.avi', '.mkv', '.mov', '.wmv']
return any(url.lower().endswith(ext) for ext in video_extensions)
def manualVideoCheck(self):
"""手动检查视频"""
try:
# 获取首页数据
response = self.fetch(self.siteUrl)
if not response or response.status_code != 200:
return False
html = response.text
# 检查是否包含视频相关元素
video_elements = ['player-box', 'video-player', 'play-btn', 'vod-play']
return any(element in html for element in video_elements)
except Exception as e:
self.log(f"手动检查视频时出错: {str(e)}", "ERROR")
return False
# 工具方法 - 网络请求
def fetch(self, url, data=None, headers=None):
"""获取网页数据"""
if headers is None:
headers = self.headers
try:
if data:
if isinstance(data, dict):
response = requests.get(url, params=data, headers=headers, verify=False, timeout=10)
else:
response = requests.post(url, data=data, headers=headers, verify=False, timeout=10)
else:
response = requests.get(url, headers=headers, verify=False, timeout=10)
# 构造一个类似原来的Response对象
class MockResponse:
def __init__(self, response):
self.response = response
self.status_code = response.status_code
def read(self):
return self.response.content
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.response.close()
return MockResponse(response)
except Exception as e:
self.log(f"请求出错: {str(e)}", "ERROR")
return None
# 日志方法
def log(self, message, level="INFO"):
"""记录日志"""
try:
# 获取当前时间
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 构建日志消息
log_message = f"[{now}] [{level}] {message}"
# 打印日志
print(log_message)
# 写入日志文件
with open("py_4kvm.log", "a", encoding="utf-8") as f:
f.write(log_message + "\n")
except Exception as e:
print(f"记录日志时出错: {str(e)}")
# 辅助方法 - 从URL中提取视频ID
def extract_vid(self, url):
"""从URL中提取视频ID"""
try:
# 使用正则表达式提取视频ID - 根据4kvm网站URL结构调整
pattern = r"/(movies|tvshows|seasons)/([^/\?]+)"
match = re.search(pattern, url)
if match:
return match.group(2) # 返回匹配的ID部分
return None
except Exception as e:
self.log(f"提取视频ID时出错: {str(e)}", "ERROR")
return None
# 主要接口实现
def homeContent(self, filter):
"""获取首页内容"""
result = {}
try:
# 分类
classes = []
for k, v in self.cateManual.items():
classes.append({
"type_id": v,
"type_name": k
})
result['class'] = classes
# 获取首页数据
response = self.fetch(self.siteUrl)
if not response or response.status_code != 200:
return result
html = response.read().decode('utf-8')
self.log(f"获取到首页HTML内容: {len(html)} 字节")
soup = BeautifulSoup(html, 'html.parser')
# 查找所有电影卡片 - 从首页推荐区域
videos = []
# 尝试查找热门影片区域
all_h2s = soup.find_all('h2')
hot_section = None
for h2 in all_h2s:
if h2.text and ('热门' in h2.text or '推荐' in h2.text):
hot_section = h2
break
if hot_section:
self.log(f"找到热门区域: {hot_section.text}")
# 查找热门区域后的所有影片列表
result_items = []
# 查找热门区域后的文章列表容器
next_div = hot_section.find_next('div', class_='items')
if next_div:
result_items = next_div.find_all('article')
self.log(f"找到 {len(result_items)} 个热门影片")
if not result_items:
# 如果找不到特定容器,尝试直接查找后续文章
result_items = hot_section.find_all_next('article', limit=20)
self.log(f"直接查找后续文章,找到 {len(result_items)} 个影片")
for item in result_items[:20]: # 只取前20个
try:
# 获取链接
link_tag = item.find('a')
if not link_tag:
continue
link = link_tag.get('href', '')
if not link:
continue
# 提取视频ID
vid = self.extract_vid(link)
if not vid:
continue
# 获取标题
title = ""
title_tag = item.find(['h3', 'div'], class_='title')
if title_tag:
title = title_tag.text.strip()
else:
# 尝试从图片alt属性获取标题
img_tag = item.find('img')
if img_tag and img_tag.get('alt'):
title = img_tag.get('alt').strip()
if not title:
continue
# 获取图片
pic = ""
img_tag = item.find('img')
if img_tag:
pic = img_tag.get('src', '')
if not pic:
pic = img_tag.get('data-src', '')
if not pic:
pic = img_tag.get('data-original', '')
if pic and not pic.startswith('http'):
pic = urllib.parse.urljoin(self.siteUrl, pic)
# 获取评分信息
remarks = ""
rating_tag = item.find(['span', 'div'], class_=['rating', 'score'])
if rating_tag:
remarks = rating_tag.text.strip()
# 检查是否为电影或电视剧
tv_tag = item.find('span', class_='tvshows')
if tv_tag:
if remarks:
remarks = f"{remarks} | 电视剧"
else:
remarks = "电视剧"
videos.append({
"vod_id": vid,
"vod_name": title,
"vod_pic": pic,
"vod_remarks": remarks
})
self.log(f"添加首页影片: {title}")
except Exception as e:
self.log(f"处理首页视频项时出错: {str(e)}", "ERROR")
continue
else:
self.log("未找到热门区域,尝试获取所有影片")
# 如果找不到热门区域,尝试获取所有文章
all_articles = soup.find_all('article', limit=20)
self.log(f"找到 {len(all_articles)} 个文章")
for item in all_articles:
try:
# 获取链接
link_tag = item.find('a')
if not link_tag:
continue
link = link_tag.get('href', '')
if not link:
continue
# 提取视频ID
vid = self.extract_vid(link)
if not vid:
continue
# 获取标题
title = ""
title_tag = item.find(['h3', 'div'], class_='title')
if title_tag:
title = title_tag.text.strip()
else:
# 尝试从图片alt属性获取标题
img_tag = item.find('img')
if img_tag and img_tag.get('alt'):
title = img_tag.get('alt').strip()
if not title:
continue
# 获取图片
pic = ""
img_tag = item.find('img')
if img_tag:
pic = img_tag.get('src', '')
if not pic:
pic = img_tag.get('data-src', '')
if not pic:
pic = img_tag.get('data-original', '')
if pic and not pic.startswith('http'):
pic = urllib.parse.urljoin(self.siteUrl, pic)
# 获取评分信息
remarks = ""
rating_tag = item.find(['span', 'div'], class_=['rating', 'score'])
if rating_tag:
remarks = rating_tag.text.strip()
# 检查是否为电影或电视剧
tv_tag = item.find('span', class_='tvshows')
if tv_tag:
if remarks:
remarks = f"{remarks} | 电视剧"
else:
remarks = "电视剧"
videos.append({
"vod_id": vid,
"vod_name": title,
"vod_pic": pic,
"vod_remarks": remarks
})
self.log(f"添加首页影片: {title}")
except Exception as e:
self.log(f"处理首页视频项时出错: {str(e)}", "ERROR")
continue
result['list'] = videos
except Exception as e:
self.log(f"获取首页内容时出错: {str(e)}", "ERROR")
return result
def homeVideoContent(self):
"""获取首页推荐视频内容"""
result = {'list': []}
videos = []
try:
response = self.fetch(self.siteUrl)
if response and response.status_code == 200:
html = response.read().decode('utf-8')
self.log(f"获取到首页推荐HTML内容: {len(html)} 字节")
soup = BeautifulSoup(html, 'html.parser')
# 查找推荐区域
all_h2s = soup.find_all('h2')
recommended_section = None
for h2 in all_h2s:
if h2.text and ('热门' in h2.text or '推荐' in h2.text):
recommended_section = h2
break
if recommended_section:
self.log(f"找到热门区域: {recommended_section.text}")
# 查找热门区域后的所有影片列表
result_items = []
# 查找热门区域后的文章列表容器
next_div = recommended_section.find_next('div', class_='items')
if next_div:
result_items = next_div.find_all('article')
self.log(f"找到 {len(result_items)} 个热门影片")
else:
# 如果找不到特定容器,尝试直接查找后续文章
result_items = recommended_section.find_all_next('article', limit=20)
self.log(f"直接查找后续文章,找到 {len(result_items)} 个影片")
for item in result_items[:20]: # 只取前20个
try:
# 获取链接
link_tag = item.find('a')
if not link_tag:
continue
link = link_tag.get('href', '')
if not link:
continue
# 提取视频ID
vid = self.extract_vid(link)
if not vid:
continue
# 获取标题
title = ""
title_tag = item.find(['h3', 'div'], class_='title')
if title_tag:
title = title_tag.text.strip()
else:
# 尝试从图片alt属性获取标题
img_tag = item.find('img')
if img_tag and img_tag.get('alt'):
title = img_tag.get('alt').strip()
if not title:
continue
# 获取图片
pic = ""
img_tag = item.find('img')
if img_tag:
pic = img_tag.get('src', '')
if not pic:
pic = img_tag.get('data-src', '')
if not pic:
pic = img_tag.get('data-original', '')
if pic and not pic.startswith('http'):
pic = urllib.parse.urljoin(self.siteUrl, pic)
# 获取评分信息
remarks = ""
rating_tag = item.find(['span', 'div'], class_=['rating', 'score'])
if rating_tag:
remarks = rating_tag.text.strip()
videos.append({
'vod_id': vid,
'vod_name': title,
'vod_pic': pic,
'vod_remarks': remarks
})
self.log(f"添加首页推荐影片: {title}")
except Exception as e:
self.log(f"处理推荐视频项时出错: {str(e)}", "ERROR")
continue
except Exception as e:
self.log(f"获取首页推荐视频内容发生错误: {str(e)}", "ERROR")
result['list'] = videos
return result
def categoryContent(self, tid, pg, filter, extend):
"""获取分类内容"""
result = {}
try:
# 构建分类URL
if pg > 1:
url = f"{self.siteUrl}/{tid}/page/{pg}"
else:
url = f"{self.siteUrl}/{tid}"
self.log(f"获取分类内容: {url}")
# 获取分类页数据
response = self.fetch(url)
if not response or response.status_code != 200:
return result
html = response.read().decode('utf-8')
self.log(f"获取到分类页面HTML内容: {len(html)} 字节")
soup = BeautifulSoup(html, 'html.parser')
# 查找所有影片列表项
items = soup.find_all('article')
self.log(f"找到 {len(items)} 个分类项目")
videos = []
for item in items:
try:
# 获取链接
link_tag = item.find('a')
if not link_tag:
continue
link = link_tag.get('href', '')
if not link:
continue
# 提取视频ID
vid = self.extract_vid(link)
if not vid:
continue
# 获取标题
title = ""
title_tag = item.find(['h3', 'div'], class_='title')
if title_tag:
title = title_tag.text.strip()
else:
# 尝试从图片alt属性获取标题
img_tag = item.find('img')
if img_tag and img_tag.get('alt'):
title = img_tag.get('alt').strip()
if not title:
continue
# 获取图片
pic = ""
img_tag = item.find('img')
if img_tag:
pic = img_tag.get('src', '')
if not pic:
pic = img_tag.get('data-src', '')
if not pic:
pic = img_tag.get('data-original', '')
if pic and not pic.startswith('http'):
pic = urllib.parse.urljoin(self.siteUrl, pic)
# 获取评分信息
remarks = ""
rating_tag = item.find(['span', 'div'], class_=['rating', 'score'])
if rating_tag:
remarks = rating_tag.text.strip()
# 检查是否为电影或电视剧
tv_tag = item.find('span', class_='tvshows')
if tv_tag:
if remarks:
remarks = f"{remarks} | 电视剧"
else:
remarks = "电视剧"
videos.append({
"vod_id": vid,
"vod_name": title,
"vod_pic": pic,
"vod_remarks": remarks
})
self.log(f"添加分类影片: {title}")
except Exception as e:
self.log(f"处理分类视频项时出错: {str(e)}", "ERROR")
continue
# 获取总页数
total_pages = 1
pagination = soup.find('div', class_='pagination')
if pagination:
page_links = pagination.find_all('a')
for link in page_links:
if link.text and link.text.isdigit():
total_pages = max(total_pages, int(link.text))
result = {
'list': videos,
'page': pg,
'pagecount': total_pages,
'limit': len(videos),
'total': len(videos) * total_pages
}
except Exception as e:
self.log(f"获取分类内容时出错: {str(e)}", "ERROR")
return result
def detailContent(self, ids):
"""获取详情页内容"""
tid = ids[0]
result = {}
result["ids"] = ids
try:
# 先尝试电视剧详情页
detail_url = f"{self.siteUrl}/seasons/{tid}"
self.log(f"尝试获取电视剧详情页: {detail_url}")
response = self.fetch(detail_url)
if response and response.status_code == 200:
# 电视剧详情页处理逻辑
html = response.read().decode('utf-8')
soup = BeautifulSoup(html, 'html.parser')
# 获取标题
title = ""
title_tag = soup.find('h1')
if title_tag:
title = title_tag.text.strip()
# 获取海报
poster = ""
poster_tag = soup.find('div', class_='poster')
if poster_tag:
img = poster_tag.find('img')
if img:
poster = img.get('src', '')
if not poster.startswith('http'):
poster = urllib.parse.urljoin(self.siteUrl, poster)
# 获取简介
content = ""
content_tag = soup.find('div', class_='wp-content')
if content_tag:
content = content_tag.text.strip()
# 查找postid
postid_match = re.search(r'postid\s*[:=]\s*(\d+)', html)
if postid_match:
postid = postid_match.group(1)
self.log(f"找到postid: {postid}")
# 查找videourls
videourls_match = re.search(r'videourls\s*[:=]\s*(\[.*?\])', html, re.DOTALL)
if videourls_match:
videourls = videourls_match.group(1)
self.log(f"找到videourls: {videourls[:200]}...")
# 清理JSON字符串
try:
# 替换单引号为双引号
clean_json = videourls.replace("'", '"')
# 修复对象键名
clean_json = re.sub(r'(\w+):', r'"\1":', clean_json)
# 修复尾部逗号
clean_json = re.sub(r',\s*}', '}', clean_json)
clean_json = re.sub(r',\s*]', ']', clean_json)
# 修复可能的格式问题
clean_json = re.sub(r'}\s*{', '},{', clean_json)
clean_json = re.sub(r']\s*\[', '],[', clean_json)
self.log(f"清理后的JSON: {clean_json[:200]}...")
# 尝试解析JSON
try:
videourls_data = json.loads(clean_json)
self.log(f"成功解析JSON找到{len(videourls_data)}个视频源")
# 只处理第一个视频源
if videourls_data and len(videourls_data) > 0:
episodes = videourls_data[0]
self.log(f"第一个视频源有{len(episodes)}")
# 构建所有分集的播放链接
play_urls = []
for episode in episodes:
if isinstance(episode, dict) and 'name' in episode and 'url' in episode:
ep_name = episode['name']
ep_url = episode['url']
self.log(f"处理集数: {ep_name}, URL索引: {ep_url}")
# 构造播放项只包含集数和URL索引
play_urls.append(f"{ep_name}集${tid}/{ep_url}")
if play_urls:
# 构建结果
vod = {
'vod_id': tid,
'vod_name': title,
'vod_pic': poster,
'vod_content': content,
'vod_play_from': "4kvm",
'vod_play_url': "#".join(play_urls)
}
result = {'list': [vod]}
self.log(f"成功获取所有分集信息")
return result
except json.JSONDecodeError as e:
self.log(f"JSON解析失败: {e}")
# 尝试手动解析
try:
# 提取所有集数信息
episodes = []
for match in re.finditer(r'{"name":(\d+),"url":(\d+)}', clean_json):
ep_name = match.group(1)
ep_url = match.group(2)
episodes.append({'name': ep_name, 'url': ep_url})
if episodes:
self.log(f"手动解析找到{len(episodes)}")
# 构建所有分集的播放链接
play_urls = []
for episode in episodes:
ep_name = episode['name']
ep_url = episode['url']
self.log(f"处理集数: {ep_name}, URL索引: {ep_url}")
# 构造播放项只包含集数和URL索引
play_urls.append(f"{ep_name}集${tid}/{ep_url}")
if play_urls:
# 构建结果
vod = {
'vod_id': tid,
'vod_name': title,
'vod_pic': poster,
'vod_content': content,
'vod_play_from': "4kvm",
'vod_play_url': "#".join(play_urls)
}
result = {'list': [vod]}
self.log(f"成功获取所有分集信息")
return result
except Exception as e:
self.log(f"手动解析失败: {e}")
import traceback
self.log(traceback.format_exc())
except Exception as e:
self.log(f"清理JSON失败: {e}")
import traceback
self.log(traceback.format_exc())
else:
self.log("未找到postid")
else:
# 如果电视剧详情页404尝试电影详情页
detail_url = f"{self.siteUrl}/movies/{tid}"
self.log(f"尝试获取电影详情页: {detail_url}")
response = self.fetch(detail_url)
if response and response.status_code == 200:
html = response.read().decode('utf-8')
soup = BeautifulSoup(html, 'html.parser')
# 获取标题
title = ""
title_tag = soup.find('h1')
if title_tag:
title = title_tag.text.strip()
# 获取海报
poster = ""
poster_tag = soup.find('div', class_='poster')
if poster_tag:
img = poster_tag.find('img')
if img:
poster = img.get('src', '')
if not poster.startswith('http'):
poster = urllib.parse.urljoin(self.siteUrl, poster)
# 获取简介
content = ""
content_tag = soup.find('div', class_='wp-content')
if content_tag:
content = content_tag.text.strip()
self.log(f"电影信息: 标题={title}, 海报={poster}, 内容长度={len(content)}")
# 尝试多种方式查找postid
postid = None
# 方式1: 使用正则表达式查找postid
postid_match = re.search(r'postid\s*[:=]\s*[\'"]?(\d+)[\'"]?', html)
if postid_match:
postid = postid_match.group(1)
self.log(f"方式1找到电影postid: {postid}")
# 方式2: 查找特定的播放器数据属性
if not postid:
play_buttons = soup.find_all('a', class_=['watch-btn', 'play-btn'])
for btn in play_buttons:
data_id = btn.get('data-id') or btn.get('data-post')
if data_id and data_id.isdigit():
postid = data_id
self.log(f"方式2找到电影postid: {postid}")
break
# 方式3: 查找播放器链接中的ID
if not postid:
player_links = soup.find_all('a', href=lambda href: href and ('player' in href or 'watch' in href))
for link in player_links:
href = link.get('href', '')
id_match = re.search(r'[?&]id=(\d+)', href)
if id_match:
postid = id_match.group(1)
self.log(f"方式3找到电影postid: {postid}")
break
# 方式4: 尝试从任何包含ID的脚本中提取
if not postid:
scripts = soup.find_all('script')
for script in scripts:
if script.string:
id_matches = re.findall(r'id\s*[:=]\s*[\'"]?(\d+)[\'"]?', script.string)
for match in id_matches:
if match.isdigit() and len(match) > 3: # 确保ID看起来是合理的
postid = match
self.log(f"方式4找到电影postid: {postid}")
break
if postid:
break
# 方式5: 最后尝试从URL中提取ID
if not postid:
# 直接使用tid作为postid的备用方案
if tid.isdigit():
postid = tid
self.log(f"方式5使用tid作为电影postid: {postid}")
else:
# 尝试提取URL中的数字部分
digits = re.search(r'(\d+)', tid)
if digits:
postid = digits.group(1)
self.log(f"方式5从tid提取电影postid: {postid}")
if postid:
# 构建播放链接
play_url = f"正片${tid}/{postid}"
# 构建结果
vod = {
'vod_id': tid,
'vod_name': title,
'vod_pic': poster,
'vod_content': content,
'vod_play_from': "4kvm",
'vod_play_url': play_url
}
result = {'list': [vod]}
self.log(f"成功获取电影信息")
return result
else:
self.log("未找到电影postid尝试使用页面ID")
# 最后的备用方案使用页面的第一个ID
all_ids = re.findall(r'id=[\'"]?([a-zA-Z0-9_-]+)[\'"]?', html)
if all_ids:
for possible_id in all_ids:
if possible_id.isdigit() and len(possible_id) > 3:
postid = possible_id
self.log(f"使用备用方案找到电影postid: {postid}")
# 构建播放链接
play_url = f"正片${tid}/{postid}"
# 构建结果
vod = {
'vod_id': tid,
'vod_name': title,
'vod_pic': poster,
'vod_content': content,
'vod_play_from': "4kvm",
'vod_play_url': play_url
}
result = {'list': [vod]}
self.log(f"成功获取电影信息")
return result
self.log("所有方法都无法找到电影postid")
else:
self.log(f"获取电影详情页失败: {response.status_code if response else '无响应'}")
except Exception as e:
self.log(f"获取详情页内容发生错误: {e}")
import traceback
self.log(traceback.format_exc())
return result
def searchContent(self, key, quick, pg=1):
"""搜索内容"""
result = {}
try:
# 构建搜索URL
search_url = f"{self.siteUrl}/xssearch"
data = {
"s": key,
"paged": pg
}
self.log(f"搜索关键词: {key}, 页码: {pg}")
# 获取搜索结果
response = self.fetch(search_url, data=data)
if not response or response.status_code != 200:
return result
html = response.read().decode('utf-8')
self.log(f"获取到搜索页面HTML内容: {len(html)} 字节")
soup = BeautifulSoup(html, 'html.parser')
# 查找搜索结果项
result_items = soup.find_all('div', class_='result-item')
self.log(f"找到 {len(result_items)} 个搜索结果项")
videos = []
for item in result_items:
try:
# 获取文章标签内的链接
article = item.find('article')
if not article:
continue
link_tag = article.find('a')
if not link_tag:
continue
link = link_tag.get('href', '')
if not link:
continue
# 获取标题
title = ""
title_tag = item.find('div', class_='details').find('div', class_='title') if item.find('div', class_='details') else None
if title_tag and title_tag.find('a'):
title = title_tag.find('a').text.strip()
else:
# 尝试从图片alt属性获取标题
img_tag = article.find('img')
if img_tag and img_tag.get('alt'):
title = img_tag.get('alt').strip()
if not title:
continue
# 获取图片
pic = ""
img_tag = article.find('img')
if img_tag:
pic = img_tag.get('src', '')
if not pic:
pic = img_tag.get('data-src', '')
if not pic:
pic = img_tag.get('data-original', '')
if pic and not pic.startswith('http'):
pic = urllib.parse.urljoin(self.siteUrl, pic)
# 检查是否为电视剧
tv_tag = article.find('span', class_='tvshows')
if tv_tag:
# 获取电视剧详情页
show_url = link if link.startswith('http') else f"{self.siteUrl}{link}"
self.log(f"获取电视剧详情页: {show_url}")
show_response = self.fetch(show_url)
if show_response and show_response.status_code == 200:
show_html = show_response.read().decode('utf-8')
show_soup = BeautifulSoup(show_html, 'html.parser')
# 查找所有季的链接
seasons = show_soup.find_all('div', class_='se-q')
for season in seasons:
season_link = season.find('a')
if season_link:
season_url = season_link.get('href', '')
if season_url:
# 获取季标题
season_title = season_link.text.strip()
if not season_title:
season_title = f"{len(videos)+1}"
# 提取季ID
season_id = season_url.strip('/').split('/')[-1]
# 获取季数
season_num = re.search(r'第(\d+)季', season_title)
season_num = season_num.group(1) if season_num else str(len(videos)+1)
videos.append({
"vod_id": season_id,
"vod_name": f"{title}{season_num}",
"vod_pic": pic,
"vod_remarks": f"{season_num}",
"vod_type": "电视剧"
})
self.log(f"添加电视剧季: {title}{season_num}")
else:
# 电影直接添加
vid = self.extract_vid(link)
if vid:
videos.append({
"vod_id": vid,
"vod_name": title,
"vod_pic": pic,
"vod_remarks": "电影",
"vod_type": "电影"
})
self.log(f"添加电影: {title}")
except Exception as e:
self.log(f"处理搜索结果项时出错: {str(e)}", "ERROR")
continue
result = {
'list': videos
}
except Exception as e:
self.log(f"搜索内容时出错: {str(e)}", "ERROR")
return result
def searchContentPage(self, key, quick, pg=1):
return self.searchContent(key, quick, pg)
def playerContent(self, flag, id, vipFlags):
"""解析播放链接"""
result = {}
try:
self.log(f"开始解析播放链接: flag={flag}, id={id}")
# 先尝试获取电视剧详情页
detail_url = f"{self.siteUrl}/seasons/{id.split('/')[0]}"
self.log(f"尝试获取电视剧详情页: {detail_url}")
response = self.fetch(detail_url)
if response and response.status_code == 200:
# 是电视剧
self.log("确认是电视剧详情页")
html = response.read().decode('utf-8')
# 查找postid
postid_match = re.search(r'postid\s*[:=]\s*(\d+)', html)
if postid_match:
postid = postid_match.group(1)
self.log(f"找到postid: {postid}")
# 获取播放页面
player_url = f"{self.siteUrl}/artplayer?id={postid}&source=0&ep={id.split('/')[1]}"
self.log(f"获取播放页面: {player_url}")
player_response = self.fetch(player_url)
if player_response and player_response.status_code == 200:
player_html = player_response.read().decode('utf-8')
# 查找m3u8链接
m3u8_match = re.search(r'url\s*[:=]\s*[\'"]([^\'"]+\.m3u8)[\'"]', player_html)
if m3u8_match:
m3u8_url = m3u8_match.group(1)
result["parse"] = 0
result["playUrl"] = m3u8_url
result["url"] = ''
result["headers"] = {
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded",
"user-agent": "okhttp/4.10.0",
"user_agent": "Mozilla/5.0 (Linux; Android 9; ASUS_I003DD Build/PI; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/68.0.3440.70 Mobile Safari/537.36",
"Referer": self.siteUrl,
"Accept-Encoding": "gzip"
}
self.log(f"找到m3u8链接: {m3u8_url}")
else:
# 如果不是电视剧,尝试电影详情页
self.log("不是电视剧,尝试电影详情页")
detail_url = f"{self.siteUrl}/movies/{id.split('/')[0]}"
self.log(f"获取电影详情页: {detail_url}")
postid = id.split('/')[1]
response = self.fetch(detail_url)
if response and response.status_code == 200:
html = response.read().decode('utf-8')
# 使用新的方法获取m3u8链接
artplayer_url = f"{self.siteUrl}/artplayer?mvsource=0&id={postid}&type=hls"
self.log(f"获取Artplayer页面: {artplayer_url}")
artplayer_response = self.fetch(artplayer_url)
if artplayer_response and artplayer_response.status_code == 200:
artplayer_html = artplayer_response.read().decode('utf-8')
# 解析页面内容
soup = BeautifulSoup(artplayer_html, 'html.parser')
# 查找所有script标签
scripts = soup.find_all('script')
for script in scripts:
if script.string:
# 查找m3u8链接
m3u8_matches = re.findall(r'url\s*[:=]\s*[\'"]([^\'"]+\.m3u8)[\'"]', script.string)
if m3u8_matches:
m3u8_url = m3u8_matches[0] # 获取第一个匹配的m3u8链接
result["parse"] = 0
result["playUrl"] = m3u8_url
result["url"] = ''
result["headers"] = {
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded",
"user-agent": "okhttp/4.10.0",
"user_agent": "Mozilla/5.0 (Linux; Android 9; ASUS_I003DD Build/PI; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/68.0.3440.70 Mobile Safari/537.36",
"Referer": self.siteUrl,
"Accept-Encoding": "gzip"
}
self.log(f"找到m3u8链接: {m3u8_url}")
return result
# 查找其他可能的视频源
source_matches = re.findall(r'source\s*[:=]\s*[\'"]([^\'"]+)[\'"]', script.string)
if source_matches:
for source in source_matches:
if '.m3u8' in source:
result["parse"] = 0
result["playUrl"] = source
result["url"] = ''
result["headers"] = {
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded",
"user-agent": "okhttp/4.10.0",
"user_agent": "Mozilla/5.0 (Linux; Android 9; ASUS_I003DD Build/PI; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/68.0.3440.70 Mobile Safari/537.36",
"Referer": self.siteUrl,
"Accept-Encoding": "gzip"
}
self.log(f"找到视频源: {source}")
return result
self.log("未找到m3u8链接")
else:
self.log(f"获取Artplayer页面失败: {artplayer_url}")
else:
self.log(f"获取电影详情页失败: {response.status_code if response else '无响应'}")
except Exception as e:
self.log(f"解析播放链接时发生错误: {e}")
import traceback
self.log(traceback.format_exc())
return result
def localProxy(self, param):
"""本地代理"""
return [404, "text/plain", {}, "Not Found"]