Add files via upload

This commit is contained in:
maoystv 2025-05-22 12:21:38 +08:00 committed by GitHub
parent 49e6a47a46
commit 30a3e30237
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 1118 additions and 0 deletions

224
PY/mioaying影视.py Normal file
View File

@ -0,0 +1,224 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import random
import sys
from base64 import b64encode, b64decode
from concurrent.futures import ThreadPoolExecutor
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
did=self.getdid()
self.headers.update({'deviceId': did})
token=self.gettk()
self.headers.update({'token': token})
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host='http://zero.mitotv.com'
headers = {
'User-Agent': 'okhttp/4.12.0',
'client': 'app',
'deviceType': 'Android'
}
def homeContent(self, filter):
data=self.post(f"{self.host}/api/v1/app/screen/screenType", headers=self.headers).json()
result = {}
cate = {
"类型": "classify",
"地区": "region",
"年份": "year"
}
sort={
'key':'sreecnTypeEnum',
'name': '排序',
'value':[{'n':'人气','v':'POPULARITY'},{'n':'评分','v':'COLLECT'},{'n':'热搜','v':'HOT'}]
}
classes = []
filters = {}
for k in data['data']:
classes.append({
'type_name': k['name'],
'type_id': k['id']
})
filters[k['id']] = [
{
'name': v['name'],
'key': cate[v['name']],
'value': [
{'n': i['name'], 'v': i['name']}
for i in v['children']
]
}
for v in k['children']
]
filters[k['id']].append(sort)
result['class'] = classes
result['filters'] = filters
return result
def homeVideoContent(self):
jdata={"condition":64,"pageNum":1,"pageSize":40}
data=self.post(f"{self.host}/api/v1/app/recommend/recommendSubList", headers=self.headers, json=jdata).json()
return {'list':self.getlist(data['data']['records'])}
def categoryContent(self, tid, pg, filter, extend):
jdata = {
'condition': {
'sreecnTypeEnum': 'NEWEST',
'typeId': tid,
},
'pageNum': int(pg),
'pageSize': 40,
}
jdata['condition'].update(extend)
data = self.post(f"{self.host}/api/v1/app/screen/screenMovie", headers=self.headers, json=jdata).json()
result = {}
result['list'] = self.getlist(data['data']['records'])
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
ids = ids[0].split('@@')
jdata = {"id": int(ids[0]), "typeId": ids[-1]}
v = self.post(f"{self.host}/api/v1/app/play/movieDesc", headers=self.headers, json=jdata).json()
v = v['data']
vod = {
'type_name': v.get('classify'),
'vod_year': v.get('year'),
'vod_area': v.get('area'),
'vod_actor': v.get('star'),
'vod_director': v.get('director'),
'vod_content': v.get('introduce'),
'vod_play_from': '',
'vod_play_url': ''
}
c = self.post(f"{self.host}/api/v1/app/play/movieDetails", headers=self.headers, json=jdata).json()
l = c['data']['moviePlayerList']
n = {str(i['id']): i['moviePlayerName'] for i in l}
m = jdata.copy()
m.update({'playerId': str(l[0]['id'])})
pd = self.getv(m, c['data']['episodeList'])
if len(l)-1:
with ThreadPoolExecutor(max_workers=len(l)-1) as executor:
future_to_player = {executor.submit(self.getd, jdata, player): player for player in l[1:]}
for future in future_to_player:
try:
o,p = future.result()
pd.update(self.getv(o,p))
except Exception as e:
print(f"请求失败: {e}")
w, e = [],[]
for i, x in pd.items():
if x:
w.append(n[i])
e.append(x)
vod['vod_play_from'] = '$$$'.join(w)
vod['vod_play_url'] = '$$$'.join(e)
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
jdata={
"condition": {
"value": key
},
"pageNum": int(pg),
"pageSize": 40
}
data=self.post(f"{self.host}/api/v1/app/search/searchMovie", headers=self.headers, json=jdata).json()
return {'list':self.getlist(data['data']['records']),'page':pg}
def playerContent(self, flag, id, vipFlags):
jdata=json.loads(self.d64(id))
data = self.post(f"{self.host}/api/v1/app/play/movieDetails", headers=self.headers, json=jdata).json()
try:
params={'playerUrl':data['data']['url'],'playerId':jdata['playerId']}
pd=self.fetch(f"{self.host}/api/v1/app/play/analysisMovieUrl", headers=self.headers, params=params).json()
url,p=pd['data'],0
except Exception as e:
print(f"请求失败: {e}")
url,p=data['data']['url'],0
return {'parse': p, 'url': url, 'header': {'User-Agent': 'okhttp/4.12.0'}}
def localProxy(self, param):
pass
def liveContent(self, url):
pass
def gettk(self):
data=self.fetch(f"{self.host}/api/v1/app/user/visitorInfo", headers=self.headers).json()
return data['data']['token']
def getdid(self):
did=self.getCache('ldid')
if not did:
hex_chars = '0123456789abcdef'
did =''.join(random.choice(hex_chars) for _ in range(16))
self.setCache('ldid',did)
return did
def getd(self,jdata,player):
x = jdata.copy()
x.update({'playerId': str(player['id'])})
response = self.post(f"{self.host}/api/v1/app/play/movieDetails", headers=self.headers, json=x).json()
return x, response['data']['episodeList']
def getv(self,d,c):
f={d['playerId']:''}
g=[]
for i in c:
j=d.copy()
j.update({'episodeId':str(i['id'])})
g.append(f"{i['episode']}${self.e64(json.dumps(j))}")
f[d['playerId']]='#'.join(g)
return f
def getlist(self,data):
videos = []
for i in data:
videos.append({
'vod_id': f"{i['id']}@@{i['typeId']}",
'vod_name': i.get('name'),
'vod_pic': i.get('cover'),
'vod_year': i.get('year'),
'vod_remarks': i.get('totalEpisode')
})
return videos
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self,encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""

652
PY/分享小熊猫.py Normal file
View File

@ -0,0 +1,652 @@
# coding = utf-8
# !/usr/bin/python
"""
作者 丢丢喵 🚓 内容均从互联网收集而来 仅供交流学习使用 版权归原创者所有 如侵犯了您的权益 请通知作者 将及时删除侵权内容
====================Diudiumiao====================
"""
from Crypto.Util.Padding import unpad
from Crypto.Util.Padding import pad
from urllib.parse import unquote
from Crypto.Cipher import ARC4
from urllib.parse import quote
from base.spider import Spider
from Crypto.Cipher import AES
from bs4 import BeautifulSoup
from base64 import b64decode
import urllib.request
import urllib.parse
import binascii
import requests
import base64
import json
import time
import sys
import re
import os
sys.path.append('..')
xurl = "http://220.231.146.94:6261"
headerx = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36'
}
pm = ''
class Spider(Spider):
global xurl
global headerx
def getName(self):
return "首页"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def extract_middle_text(self, text, start_str, end_str, pl, start_index1: str = '', end_index2: str = ''):
if pl == 3:
plx = []
while True:
start_index = text.find(start_str)
if start_index == -1:
break
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
break
middle_text = text[start_index + len(start_str):end_index]
plx.append(middle_text)
text = text.replace(start_str + middle_text + end_str, '')
if len(plx) > 0:
purl = ''
for i in range(len(plx)):
matches = re.findall(start_index1, plx[i])
output = ""
for match in matches:
match3 = re.search(r'(?:^|[^0-9])(\d+)(?:[^0-9]|$)', match[1])
if match3:
number = match3.group(1)
else:
number = 0
if 'http' not in match[0]:
output += f"#{'📽️集多👉' + match[1]}${number}{xurl}{match[0]}"
else:
output += f"#{'📽️集多👉' + match[1]}${number}{match[0]}"
output = output[1:]
purl = purl + output + "$$$"
purl = purl[:-3]
return purl
else:
return ""
else:
start_index = text.find(start_str)
if start_index == -1:
return ""
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
return ""
if pl == 0:
middle_text = text[start_index + len(start_str):end_index]
return middle_text.replace("\\", "")
if pl == 1:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
jg = ' '.join(matches)
return jg
if pl == 2:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
new_list = [f'✨集多👉{item}' for item in matches]
jg = '$$$'.join(new_list)
return jg
def homeContent(self, filter):
result = {}
result = {"class": [{"type_id": "2", "type_name": "集多电影🌠"},
{"type_id": "1", "type_name": "集多剧集🌠"},
{"type_id": "3", "type_name": "集多综艺🌠"},
{"type_id": "4", "type_name": "集多动漫🌠"},
{"type_id": "6", "type_name": "集多少儿🌠"},
{"type_id": "5", "type_name": "集多短剧🌠"},
{"type_id": "12", "type_name": "集多纪录🌠"},
{"type_id": "15", "type_name": "集多直播🌠"}]
}
return result
def decrypt(self, encrypted_data):
key = "aEw3eE40UDlyUzJ2SzVNcQ=="
iv = "aEw3eE40UDlyUzJ2SzVNcQ=="
key_bytes = base64.b64decode(key)
iv_bytes = base64.b64decode(iv)
encrypted_bytes = base64.b64decode(encrypted_data)
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
decrypted_padded_bytes = cipher.decrypt(encrypted_bytes)
decrypted_bytes = unpad(decrypted_padded_bytes, AES.block_size)
return decrypted_bytes.decode('utf-8')
def decrypt_wb(self, encrypted_data):
key_base64 = "aEw3eE40UDlyUzJ2SzVNcQ=="
key_bytes = base64.b64decode(key_base64)
iv_base64 = "aEw3eE40UDlyUzJ2SzVNcQ=="
iv_bytes = base64.b64decode(iv_base64)
plaintext = encrypted_data
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
ciphertext_bytes = cipher.encrypt(pad(plaintext.encode('utf-8'), AES.block_size))
ciphertext_base64 = base64.b64encode(ciphertext_bytes).decode('utf-8')
return ciphertext_base64
def homeVideoContent(self):
videos = []
payload = {}
response = requests.post(xurl + "/api.php/getappapi.index/initV119", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
duoxuan = ['1', '2', '3','4','5','6','7','8']
for duo in duoxuan:
js = detail['type_list'][int(duo)]['recommend_list']
for vod in js:
name = vod['vod_name']
id = vod['vod_id']
pic = vod['vod_pic']
remark = vod['vod_remarks']
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": '集多▶️' + remark
}
videos.append(video)
result = {'list': videos}
return result
def categoryContent(self, cid, pg, filter, ext):
result = {}
videos = []
if pg:
page = int(pg)
else:
page = 1
payload = {
"area": "全部",
"year": "全部",
"type_id": cid,
"page": str(page),
"sort": "最新",
"lang": "全部",
"class": "全部"
}
response = requests.post(xurl + "/api.php/getappapi.index/typeFilterVodList", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
js = detail['recommend_list']
for vod in js:
name = vod['vod_name']
id = vod['vod_id']
pic = vod['vod_pic']
remark = vod['vod_remarks']
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": '集多▶️' + remark
}
videos.append(video)
result = {'list': videos}
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
global pm
did = ids[0]
result = {}
videos = []
xianlu = ''
purl = ''
payload = {
"vod_id": did
}
response = requests.post(xurl + "/api.php/getappapi.index/vodDetail", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
url = 'https://fs-im-kefu.7moor-fs1.com/ly/4d2c3f00-7d4c-11e5-af15-41bf63ae4ea0/1732707176882/jiduo.txt'
response = requests.get(url)
response.encoding = 'utf-8'
code = response.text
name = self.extract_middle_text(code, "s1='", "'", 0)
Jumps = self.extract_middle_text(code, "s2='", "'", 0)
vod_content = '😸集多🎉为您介绍剧情📢' + detail['vod']['vod_blurb']
vod_actor = detail['vod']['vod_actor']
vod_director = detail['vod']['vod_director']
vod_remarks = detail['vod']['vod_remarks']
vod_area = detail['vod']['vod_area']
vod_year = detail['vod']['vod_year']
if name not in vod_content:
bofang = Jumps
xianlu = '1'
else:
soup = detail['vod_play_list']
gl = []
for vod in soup:
xian = vod['player_info']['show']
if any(item in xian for item in gl):
continue
xianlu = xianlu + xian + '$$$'
soups = vod['urls']
for vods in soups:
name = vods['name']
token = vods['token']
parse = vods['parse_api_url'] + "@" + token
purl = purl + name + '$' + parse + '#'
purl = purl[:-1] + '$$$'
xianlu = xianlu[:-3]
purl = purl[:-3]
videos.append({
"vod_id": did,
"vod_actor": vod_actor,
"vod_director": vod_director,
"vod_content": vod_content,
"vod_remarks": vod_remarks,
"vod_year": vod_year,
"vod_area": vod_area,
"vod_play_from": xianlu,
"vod_play_url": purl
})
result['list'] = videos
return result
def playerContent(self, flag, id, vipFlags):
if 'ETH-' in id: # BA蓝光
fenge = id.split("@")
fenges = fenge[0].split("ETH-")
parse_api = fenges[0]
url1 = "ETH-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'mtv-' in id: # MT蓝光
fenge = id.split("@")
fenges = fenge[0].split("mtv-")
parse_api = fenges[0]
url1 = "mtv-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'Ace_Net-' in id: # AC蓝光
fenge = id.split("@")
fenges = fenge[0].split("Ace_Net-")
parse_api = fenges[0]
url1 = "Ace_Net-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'jlqp-' in id: # JL蓝光
fenge = id.split("@")
fenges = fenge[0].split("jlqp-")
parse_api = fenges[0]
url1 = "jlqp-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'SGYA-' in id: # SG蓝光
fenge = id.split("type")
fenges = fenge[1].split("@")
url = fenge[0] + "token=" + fenges[1] + "&type" + fenges[0]
payload = {}
response = requests.post(url=url, headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
url = response_data.get('url')
elif 'sdm3u8-' in id: # SD蓝光
fenge = id.split("@")
fenges = fenge[0].split("sdm3u8-")
parse_api = fenges[0]
url1 = "sdm3u8-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'SGSJ-' in id: # SJ蓝光
fenge = id.split("type")
fenges = fenge[1].split("@")
url = fenge[0] + "token=" + fenges[1] + "&type" + fenges[0]
payload = {}
response = requests.post(url=url, headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
url = response_data.get('url')
elif 'mgtv.com' in id: # 芒果蓝光
fenge = id.split("@")
fenges = fenge[0].split("https")
parse_api = fenges[0]
url1 = "https" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'ACG-' in id: # CG蓝光
fenge = id.split("@")
fenges = fenge[0].split("ACG-")
parse_api = fenges[0]
url1 = "ACG-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'dmbs-' in id: # 动漫专线
fenge = id.split("@")
fenges = fenge[0].split("dmbs-")
parse_api = fenges[0]
url1 = "dmbs-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
response = requests.get(url=url, headers=headerx, allow_redirects=False)
if response.status_code == 302:
url = response.headers.get('Location')
elif '0-' in id: # AR蓝光
fenge = id.split("@")
fenges = fenge[0].split("0-")
parse_api = fenges[0]
url1 = "0-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'mlive-' in id: # 中央卫视
fenge = id.split("@")
fenges = fenge[0].split("mlive-")
parse_api = fenges[0]
url1 = "mlive-" + fenges[1]
id2 = self.decrypt_wb(url1)
payload = {
"parse_api": parse_api,
"url": id2,
"token": fenge[1]
}
response = requests.post(xurl+"/api.php/getappapi.index/vodParse", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
detail_json = json.loads(detail.get('json'))
url = detail_json.get('url')
elif 'mp4' in id: # 4K
fenge = id.split("@")
fenges = fenge[0].split("http")
url = "http" + fenges[1]
response = requests.get(url=url, headers=headerx, allow_redirects=False)
if response.status_code == 302:
url = response.headers.get('Location')
elif 'iqiyi.com' in id or 'qq.com' in id or 'youku.com' in id or 'bilibili.com' in id or 'ixigua.com' in id: # QY蓝光 QQ蓝光 YK蓝光 B站 西瓜蓝光
fenge = id.split("@")
url = fenge[0]
elif 'm3u8' in id: # TT蓝光
fenge = id.split("@")
url = fenge[0]
result = {}
result["parse"] = 1
result["playUrl"] = ''
result["url"] = url
result["header"] = headerx
return result
def searchContentPage(self, key, quick, page):
result = {}
videos = []
if not page:
page = '1'
payload = {
"keywords": key,
"type_id": "0",
"page": str(page),
}
response = requests.post(xurl + "/api.php/getappapi.index/searchList", headers=headerx, json=payload)
if response.status_code == 200:
response_data = response.json()
data = response_data.get('data')
encrypted_data = data
detail = self.decrypt(encrypted_data)
detail = json.loads(detail)
js = detail['search_list']
for vod in js:
name = vod['vod_name']
id = vod['vod_id']
pic = vod['vod_pic']
remark = vod['vod_remarks']
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": '集多▶️' + remark
}
videos.append(video)
result = {'list': videos}
result['page'] = page
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def searchContent(self, key, quick, pg="1"):
return self.searchContentPage(key, quick, '1')
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None

242
PY/央视综艺.py Normal file
View File

@ -0,0 +1,242 @@
#coding=utf-8
#!/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
import json
import time
import base64
import requests
class Spider(Spider):
def getName(self):
return "央视综艺"
def init(self, extend=""):
print("============{0}============".format(extend))
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
result = {}
cateManual = {
"中华情": "TOPC1451541564922207",
"回声嘹亮": "TOPC1451535575561597",
"你好生活第三季": "TOPC1627961377879898",
"我的艺术清单": "TOPC1582272259917160",
"黄金100秒": "TOPC1451468496522494",
"非常6+1": "TOPC1451467940101208",
"向幸福出发": "TOPC1451984638791216",
"幸福账单": "TOPC1451984801613379",
"中国文艺报道": "TOPC1601348042760302",
"舞蹈世界": "TOPC1451547605511387",
"艺览天下": "TOPC1451984851125433",
"天天把歌唱": "TOPC1451535663610626",
"金牌喜剧班": "TOPC1611826337610628",
"环球综艺秀": "TOPC1571300682556971",
"挑战不可能第五季": "TOPC1579169060379297",
"我们有一套": "TOPC1451527089955940",
"为了你": "TOPC1451527001597710",
"朗读者第一季": "TOPC1487120479377477",
"挑战不可能第二季": "TOPC1474277421637816",
"精彩一刻": "TOPC1451464786232149",
"挑战不可能之加油中国": "TOPC1547519813971570",
"挑战不可能第一季": "TOPC1452063816677656",
"机智过人第三季": "TOPC1564019920570762",
"经典咏流传第二季": "TOPC1547521714115947",
"挑战不可能第三季": "TOPC1509500865106312",
"经典咏流传第一季": "TOPC1513676755770201",
"欢乐中国人第二季": "TOPC1516784350726581",
"故事里的中国第一季": "TOPC1569729252342702",
"你好生活第二季": "TOPC1604397385056621",
"喜上加喜": "TOPC1590026042145705",
"走在回家的路上": "TOPC1577697653272281",
"综艺盛典": "TOPC1451985071887935",
"艺术人生": "TOPC1451984891490556",
"全家好拍档": "TOPC1474275463547690",
"大魔术师": "TOPC1451984047073332",
"欢乐一家亲": "TOPC1451984214170587",
"开心辞典": "TOPC1451984378754815",
"综艺星天地": "TOPC1451985188986150",
"激情广场": "TOPC1451984341218765",
"笑星大联盟": "TOPC1451984731428297",
"天天乐": "TOPC1451984447718918",
"欢乐英雄": "TOPC1451984242834620",
"欢乐中国行": "TOPC1451984301286720",
"我爱满堂彩": "TOPC1451538709371329",
"综艺头条": "TOPC1569226855085860",
"中华情": "TOPC1451541564922207",
"魔法奇迹": "TOPC1451542029126607"
}
classes = []
for k in cateManual:
classes.append({
'type_name': k,
'type_id': cateManual[k]
})
result['class'] = classes
if filter:
result['filters'] = self.config['filter']
return result
def homeVideoContent(self):
result = {
'list': []
}
return result
def categoryContent(self, tid, pg, filter, extend):
result = {}
extend['id'] = tid
extend['p'] = pg
filterParams = ["id", "p", "d"]
params = ["", "", ""]
for idx in range(len(filterParams)):
fp = filterParams[idx]
if fp in extend.keys():
params[idx] = '{0}={1}'.format(filterParams[idx], extend[fp])
suffix = '&'.join(params)
url = 'https://api.cntv.cn/NewVideo/getVideoListByColumn?{0}&n=20&sort=desc&mode=0&serviceId=tvcctv&t=json'.format(suffix)
if not tid.startswith('TOPC'):
url = 'https://api.cntv.cn/NewVideo/getVideoListByAlbumIdNew?{0}&n=20&sort=desc&mode=0&serviceId=tvcctv&t=json'.format(suffix)
rsp = self.fetch(url, headers=self.header)
jo = json.loads(rsp.text)
vodList = jo['data']['list']
videos = []
for vod in vodList:
guid = vod['guid']
title = vod['title']
img = vod['image']
brief = vod['brief']
videos.append({
"vod_id": guid + "###" + img,
"vod_name": title,
"vod_pic": img,
"vod_remarks": ''
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, array):
aid = array[0].split('###')
tid = aid[0]
url = "https://vdn.apps.cntv.cn/api/getHttpVideoInfo.do?pid={0}".format(tid)
rsp = self.fetch(url, headers=self.header)
jo = json.loads(rsp.text)
title = jo['title'].strip()
link = jo['hls_url'].strip()
vod = {
"vod_id": tid,
"vod_name": title,
"vod_pic": aid[1],
"type_name": '',
"vod_year": "",
"vod_area": "",
"vod_remarks": "",
"vod_actor": "",
"vod_director": "",
"vod_content": ""
}
vod['vod_play_from'] = 'CCTV'
vod['vod_play_url'] = title + "$" + link
result = {
'list': [vod]
}
return result
def searchContent(self, key, quick):
result = {
'list': []
}
return result
def playerContent(self, flag, id, vipFlags):
result = {}
# 先尝试获取原始m3u8文件
rsp = self.fetch(id, headers=self.header)
content = rsp.text.strip()
if not content:
# 如果获取失败,直接返回原始链接
result["parse"] = 0
result["playUrl"] = ''
result["url"] = id
result["header"] = self.header
return result
arr = content.split('\n')
urlPrefix = self.regStr(id, '(http[s]?://[a-zA-z0-9.]+)/')
# 尝试获取高清链接
resolutions = ['2000', '1200', '800'] # 从高到低尝试
final_url = id # 默认使用原始链接
for res in resolutions:
try:
subUrl = arr[-1].split('/')
subUrl[3] = res
subUrl[-1] = f'{res}.m3u8'
hdUrl = urlPrefix + '/'.join(subUrl)
# 检查高清链接是否有效
hdRsp = requests.head(hdUrl, headers=self.header, timeout=5)
if hdRsp.status_code == 200:
final_url = hdUrl
break
except:
continue
result["parse"] = 0
result["playUrl"] = ''
result["url"] = final_url
result["header"] = self.header
return result
config = {
"player": {},
"filter": {
"TOPC1451557970755294": [
{
"key": "d",
"name": "年份",
"value": [
{"n": "全部", "v": ""},
{"n": "2023", "v": "2023"},
{"n": "2022", "v": "2022"},
{"n": "2021", "v": "2021"},
{"n": "2020", "v": "2020"},
{"n": "2019", "v": "2019"},
{"n": "2018", "v": "2018"},
{"n": "2017", "v": "2017"},
{"n": "2016", "v": "2016"},
{"n": "2015", "v": "2015"},
{"n": "2014", "v": "2014"},
{"n": "2013", "v": "2013"},
{"n": "2012", "v": "2012"},
{"n": "2011", "v": "2011"},
{"n": "2010", "v": "2010"},
{"n": "2009", "v": "2009"}
]
}
]
}
}
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36",
"Referer": "https://www.cctv.com/",
"Origin": "https://www.cctv.com"
}
def localProxy(self, param):
return [200, "video/MP2T", None, ""]