fun9/py/get_iptv.py
2025-12-16 04:27:02 +00:00

419 lines
16 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import os
import requests
import logging
import shutil
import threading
from collections import OrderedDict
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 全局锁,用于文件写入(如果需要多线程写入时)
write_lock = threading.Lock()
def get_session():
"""创建一个带有重试机制的requests Session"""
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def load_urls_from_file(file_path):
"""从文本文件加载URL列表"""
urls = []
if not os.path.exists(file_path):
logger.warning(f"URL配置文件未找到: {file_path}")
return urls
try:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# 忽略空行和以 # 开头的注释行
if line and not line.startswith("#"):
urls.append(line)
logger.info(f"{file_path} 加载了 {len(urls)} 个源")
except Exception as e:
logger.error(f"读取URL文件失败: {e}")
return urls
def parse_template(template_file):
"""解析模板文件"""
template_channels = OrderedDict()
current_category = None
try:
with open(template_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "#genre#" in line:
current_category = line.split(",")[0].strip()
template_channels[current_category] = []
elif current_category:
channel_name = line.split(",")[0].strip()
template_channels[current_category].append(channel_name)
except FileNotFoundError:
logger.error(f"模板文件未找到: {template_file}")
return template_channels
def fetch_channels(url):
"""从URL获取频道列表支持M3U和TXT格式"""
channels = OrderedDict()
session = get_session()
try:
response = session.get(url, timeout=30)
response.raise_for_status()
response.encoding = response.apparent_encoding or "utf-8" # 自动检测编码
lines = [line.strip() for line in response.text.splitlines() if line.strip()]
if not lines:
return channels
# 判断是否为 M3U 格式 (检查前几行)
is_m3u = any("#EXTINF" in line for line in lines[:10])
if is_m3u:
current_category = "默认分类"
current_name = "未知频道"
# 预编译正则
re_group = re.compile(r'group-title="([^"]*)"')
re_name = re.compile(r',([^,]*)$')
for line in lines:
if line.startswith("#EXTINF"):
# 提取分类
group_match = re_group.search(line)
if group_match:
current_category = group_match.group(1).strip()
# 提取名称
name_match = re_name.search(line)
if name_match:
current_name = name_match.group(1).strip()
elif not line.startswith("#") and "://" in line:
# 这是一个URL行
if current_category not in channels:
channels[current_category] = []
# 简单过滤无效名称
if current_name and current_name != "未知频道":
channels[current_category].append((current_name, line))
# 重置名称,防止下一次使用旧名称
current_name = "未知频道"
else:
# TXT 格式处理 (Genre,Name,URL)
current_category = None
for line in lines:
if "#genre#" in line:
current_category = line.split(",")[0].strip()
if current_category not in channels:
channels[current_category] = []
elif current_category and "," in line:
parts = line.split(",", 1)
if len(parts) == 2:
name, url = parts
if name.strip() and url.strip():
channels[current_category].append((name.strip(), url.strip()))
return channels
except requests.exceptions.RequestException as e:
logger.error(f"请求 {url} 失败: {e}")
return OrderedDict()
except Exception as e:
logger.error(f"处理 {url} 时发生未知错误: {e}")
return OrderedDict()
def match_channels(template_channels, all_channels):
"""
匹配频道逻辑优化版
"""
matched = OrderedDict()
unmatched_template = OrderedDict()
# 1. 数据扁平化预处理:将所有源频道放入一个大列表中,避免多层循环
# 结构: (normalized_name, original_name, url, category)
# 这里的 normalized_name 用于不区分大小写的比对
flattened_source_channels = []
for cat, chans in all_channels.items():
for name, url in chans:
flattened_source_channels.append({
'norm_name': name.lower(),
'name': name,
'url': url,
'cat': cat,
'key': f"{name}_{url}" # 用于去重的唯一键
})
used_channel_keys = set()
# 初始化输出结构
for cat in template_channels:
matched[cat] = OrderedDict()
unmatched_template[cat] = []
# 2. 匹配逻辑
for category, tmpl_names in template_channels.items():
for tmpl_name in tmpl_names:
# 解析变体: "CCTV1|CCTV-1" -> ["CCTV1", "CCTV-1"]
variants = [n.strip() for n in tmpl_name.split("|") if n.strip()]
found_for_this_template = False
# 对每个变体进行匹配
for variant in variants:
variant_lower = variant.lower()
# 在扁平化的源列表中搜索
# 优化点:不再使用正则,而是使用字符串包含 (in) 或 精确匹配
# 如果需要精确匹配优先,可以分两轮;这里保留原逻辑的"包含即匹配"
for src in flattened_source_channels:
# 检查是否已使用
if src['key'] in used_channel_keys:
continue
# 核心匹配逻辑:源频道名称 包含 模板变体
# 例如:模板 "CCTV-1" 匹配源 "CCTV-1 FHD"
if variant_lower in src['norm_name']:
if src['name'] not in matched[category]:
matched[category][src['name']] = []
matched[category][src['name']].append((src['name'], src['url']))
used_channel_keys.add(src['key'])
found_for_this_template = True
# 注意:原代码逻辑没有 break允许一个变体匹配多个源频道多线路
if not found_for_this_template:
unmatched_template[category].append(tmpl_name)
# 3. 找出源中完全未被使用的频道
unmatched_source = OrderedDict()
for src in flattened_source_channels:
if src['key'] not in used_channel_keys:
if src['cat'] not in unmatched_source:
unmatched_source[src['cat']] = []
unmatched_source[src['cat']].append((src['name'], src['url']))
return matched, unmatched_template, unmatched_source
def is_ipv6(url):
"""检测是否为 IPv6 地址"""
# 简单的 IPv6 URL 检测: http://[2409:...]
return "://[" in url
def generate_outputs(channels, template_channels):
"""生成 m3u 和 txt 文件"""
written_urls = set()
channel_counter = 1
output_m3u_path = "lib/iptv.m3u"
output_txt_path = "lib/iptv.txt"
try:
with write_lock:
with open(output_m3u_path, "w", encoding="utf-8") as m3u, \
open(output_txt_path, "w", encoding="utf-8") as txt:
m3u.write("#EXTM3U\n")
for category in template_channels:
if category not in channels or not channels[category]:
continue
txt.write(f"\n{category},#genre#\n")
# 遍历该分类下的匹配频道
for channel_key_name, channel_list in channels[category].items():
# 去重逻辑:同一个频道名下,去除 URL 相同的
unique_urls = []
seen_urls = set()
for _, url in channel_list:
if url not in seen_urls and url not in written_urls:
unique_urls.append(url)
seen_urls.add(url)
written_urls.add(url)
total_lines = len(unique_urls)
for idx, url in enumerate(unique_urls, 1):
# 生成后缀
base_url = url.split("$")[0] # 清理可能已有的后缀
suffix_name = "IPV6" if is_ipv6(url) else "IPV4"
# 构造显示名称
display_name = channel_key_name
# 构造最终 URL 标注
# 格式:$LR•IPV4•2『线路1』
meta_suffix = f"$LR•{suffix_name}"
if total_lines > 1:
meta_suffix += f"{total_lines}『线路{idx}"
final_url = f"{base_url}{meta_suffix}"
# 写入 M3U
m3u.write(f'#EXTINF:-1 tvg-id="{channel_counter}" tvg-name="{channel_key_name}" group-title="{category}",{display_name}\n')
m3u.write(f"{final_url}\n")
# 写入 TXT
txt.write(f"{display_name},{final_url}\n")
channel_counter += 1
logger.info(f"输出完成,共处理 {channel_counter - 1} 个有效频道。")
except Exception as e:
logger.error(f"写入文件失败: {e}")
def generate_unmatched_report(unmatched_template, unmatched_source):
"""生成未匹配报告"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
report_file = "py/config/iptv_test.txt"
total_template_lost = sum(len(v) for v in unmatched_template.values())
total_source_lost = sum(len(v) for v in unmatched_source.values())
try:
with open(report_file, "w", encoding="utf-8") as f:
f.write(f"# 未匹配频道报告 - {timestamp}\n")
f.write(f"# 模板未匹配数: {total_template_lost}\n")
f.write(f"# 源未利用频道数: {total_source_lost}\n\n")
f.write("## 1. 模板中存在但源中未找到的频道 (建议从模板删除)\n")
for cat, names in unmatched_template.items():
if names:
f.write(f"\n{cat},#genre#\n")
# 去重保留顺序
for name in list(OrderedDict.fromkeys(names)):
f.write(f"{name},\n")
f.write("\n\n## 2. 源中存在但模板未收录的频道 (建议添加到模板)\n")
for cat, chans in unmatched_source.items():
if chans:
f.write(f"\n{cat},#genre#\n")
# 只记录名称
unique_names = list(OrderedDict.fromkeys([c[0] for c in chans]))
for name in unique_names:
f.write(f"{name},\n")
logger.info(f"未匹配报告已生成: {report_file}")
return total_template_lost
except Exception as e:
logger.error(f"生成报告失败: {e}")
return 0
def remove_unmatched_from_template(template_file, unmatched_template):
"""备份并更新模板,移除未匹配项"""
backup_file = template_file + ".backup"
try:
shutil.copy2(template_file, backup_file)
logger.info(f"备份模板至: {backup_file}")
with open(template_file, "r", encoding="utf-8") as f:
lines = f.readlines()
new_lines = []
current_cat = None
# 构建需删除集合以加快查找: {"央视": {"CCTV-99", ...}}
to_remove = {cat: set(names) for cat, names in unmatched_template.items()}
for line in lines:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
new_lines.append(line)
continue
if "#genre#" in stripped:
current_cat = stripped.split(",")[0].strip()
new_lines.append(line)
continue
if current_cat:
name = stripped.split(",")[0].strip()
# 检查是否在删除列表中
# 注意:模板中可能是 "CCTV1|CCTV-1",未匹配列表中记录的是整串
if current_cat in to_remove and name in to_remove[current_cat]:
logger.info(f"移除无效频道: [{current_cat}] {name}")
continue
new_lines.append(line)
with open(template_file, "w", encoding="utf-8") as f:
f.writelines(new_lines)
logger.info("模板文件更新完成。")
except Exception as e:
logger.error(f"更新模板失败: {e}")
def main(template_file, tv_urls):
# 1. 解析模板
logger.info("开始解析模板...")
template = parse_template(template_file)
# 2. 并发获取源数据
logger.info(f"开始从 {len(tv_urls)} 个源获取数据...")
all_channels = OrderedDict()
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(fetch_channels, url): url for url in tv_urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
if data:
for cat, chans in data.items():
if cat not in all_channels:
all_channels[cat] = []
all_channels[cat].extend(chans)
logger.info(f"{url} 获取成功: {sum(len(v) for v in data.values())} 个频道")
else:
logger.warning(f"{url} 无数据")
except Exception as e:
logger.error(f"{url} 处理异常: {e}")
# 3. 核心匹配
logger.info("开始匹配频道...")
matched, unmatched_tmpl, unmatched_src = match_channels(template, all_channels)
# 4. 生成结果文件
logger.info("生成播放列表文件...")
generate_outputs(matched, template)
# 5. 生成报告
lost_count = generate_unmatched_report(unmatched_tmpl, unmatched_src)
# 6. (可选) 自动清洗模板
if lost_count > 0:
logger.info(f"发现 {lost_count} 个模板频道未匹配,准备从模板中移除...")
remove_unmatched_from_template(template_file, unmatched_tmpl)
else:
logger.info("所有模板频道均匹配成功。")
if __name__ == "__main__":
# 配置区
TEMPLATE_FILE = "py/config/iptv.txt"
URLS_FILE = "py/config/urls.txt"
TV_URLS = load_urls_from_file(URLS_FILE)
main(TEMPLATE_FILE, TV_URLS)