Add files via upload
This commit is contained in:
parent
7d1f232238
commit
4d345de506
245
PY/分享云端.py
Normal file
245
PY/分享云端.py
Normal file
@ -0,0 +1,245 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# by @嗷呜
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import requests
|
||||
from Crypto.Hash import MD5
|
||||
sys.path.append("..")
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from urllib.parse import quote, urlparse
|
||||
from base64 import b64encode, b64decode
|
||||
import json
|
||||
import time
|
||||
from base.spider import Spider
|
||||
|
||||
|
||||
class Spider(Spider):
|
||||
|
||||
def init(self, extend=""):
|
||||
self.host = self.gethost()
|
||||
pass
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
pass
|
||||
|
||||
def manualVideoCheck(self):
|
||||
pass
|
||||
|
||||
def action(self, action):
|
||||
pass
|
||||
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
def homeContent(self, filter):
|
||||
data = self.getdata("/api.php/getappapi.index/initV119")
|
||||
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
|
||||
"sort": "排序"}
|
||||
filters = {}
|
||||
classes = []
|
||||
json_data = data["type_list"]
|
||||
homedata = data["banner_list"][8:]
|
||||
for item in json_data:
|
||||
if item["type_name"] == "全部":
|
||||
continue
|
||||
has_non_empty_field = False
|
||||
jsontype_extend = json.loads(item["type_extend"])
|
||||
homedata.extend(item["recommend_list"])
|
||||
jsontype_extend["sort"] = "最新,最热,最赞"
|
||||
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
|
||||
for key in dy:
|
||||
if key in jsontype_extend and jsontype_extend[key].strip() != "":
|
||||
has_non_empty_field = True
|
||||
break
|
||||
if has_non_empty_field:
|
||||
filters[str(item["type_id"])] = []
|
||||
for dkey in jsontype_extend:
|
||||
if dkey in dy and jsontype_extend[dkey].strip() != "":
|
||||
values = jsontype_extend[dkey].split(",")
|
||||
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
|
||||
value.strip() != ""]
|
||||
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
|
||||
result = {}
|
||||
result["class"] = classes
|
||||
result["filters"] = filters
|
||||
result["list"] = homedata[1:]
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
pass
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
|
||||
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
|
||||
"class": extend.get('class', '全部')}
|
||||
result = {}
|
||||
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
|
||||
result["list"] = data["recommend_list"]
|
||||
result["page"] = pg
|
||||
result["pagecount"] = 9999
|
||||
result["limit"] = 90
|
||||
result["total"] = 999999
|
||||
return result
|
||||
|
||||
def detailContent(self, ids):
|
||||
body = f"vod_id={ids[0]}"
|
||||
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
|
||||
vod = data["vod"]
|
||||
play = []
|
||||
names = []
|
||||
for itt in data["vod_play_list"]:
|
||||
a = []
|
||||
names.append(itt["player_info"]["show"])
|
||||
for it in itt['urls']:
|
||||
it['user_agent'] = itt["player_info"].get("user_agent")
|
||||
it["parse"] = itt["player_info"].get("parse")
|
||||
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
|
||||
play.append("#".join(a))
|
||||
vod["vod_play_from"] = "$$$".join(names)
|
||||
vod["vod_play_url"] = "$$$".join(play)
|
||||
result = {"list": [vod]}
|
||||
return result
|
||||
|
||||
def searchContent(self, key, quick, pg="1"):
|
||||
body = f"keywords={key}&type_id=0&page={pg}"
|
||||
data = self.getdata("/api.php/getappapi.index/searchList", body)
|
||||
result = {"list": data["search_list"], "page": pg}
|
||||
return result
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
ids = json.loads(self.d64(id))
|
||||
h = {"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
|
||||
try:
|
||||
if re.search(r'url=', ids['parse_api_url']):
|
||||
data = self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
|
||||
url = data.get('url') or data['data'].get('url')
|
||||
else:
|
||||
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'], True))}&token={ids.get('token')}"
|
||||
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
|
||||
url = json.loads(b)['url']
|
||||
if 'error' in url: raise ValueError(f"解析失败: {url}")
|
||||
p = 0
|
||||
except Exception as e:
|
||||
print('错误信息:', e)
|
||||
url, p = ids['url'], 1
|
||||
|
||||
if re.search(r'\.jpg|\.png|\.jpeg', url):
|
||||
url = self.Mproxy(url)
|
||||
result = {}
|
||||
result["parse"] = p
|
||||
result["url"] = url
|
||||
result["header"] = h
|
||||
return result
|
||||
|
||||
def localProxy(self, param):
|
||||
return self.Mlocal(param)
|
||||
|
||||
def gethost(self):
|
||||
headers = {
|
||||
'User-Agent': 'okhttp/3.14.9'
|
||||
}
|
||||
response = self.fetch('https://ydysdynamicdomainname.68.gy:10678/c9m2js298x82h6/l9m8bx23j2o2p9q/dynamicdomainname.txt',
|
||||
headers=headers).text
|
||||
return self.host_late(response.split('\n'))
|
||||
|
||||
def host_late(self, url_list):
|
||||
if isinstance(url_list, str):
|
||||
urls = [u.strip() for u in url_list.split(',')]
|
||||
else:
|
||||
urls = url_list
|
||||
|
||||
if len(urls) <= 1:
|
||||
return urls[0] if urls else ''
|
||||
|
||||
results = {}
|
||||
threads = []
|
||||
|
||||
def test_host(url):
|
||||
try:
|
||||
start_time = time.time()
|
||||
response = requests.head(url,timeout=1.0, allow_redirects=False)
|
||||
delay = (time.time() - start_time) * 1000
|
||||
results[url] = delay
|
||||
except Exception as e:
|
||||
results[url] = float('inf')
|
||||
|
||||
for url in urls:
|
||||
t = threading.Thread(target=test_host, args=(url,))
|
||||
threads.append(t)
|
||||
t.start()
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
return min(results.items(), key=lambda x: x[1])[0]
|
||||
|
||||
def aes(self, text, b=None):
|
||||
key = b"k9o3p2c8b7m3z0o8"
|
||||
cipher = AES.new(key, AES.MODE_CBC, key)
|
||||
if b:
|
||||
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
|
||||
ct = b64encode(ct_bytes).decode("utf-8")
|
||||
return ct
|
||||
else:
|
||||
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
|
||||
return pt.decode("utf-8")
|
||||
|
||||
def header(self):
|
||||
t = str(int(time.time()))
|
||||
header = {"Referer": self.host,
|
||||
"User-Agent": "okhttp/3.14.9", "app-version-code": "140", "app-ui-mode": "light",
|
||||
"app-api-verify-time": t, "app-user-device-id": self.md5(t),
|
||||
"app-api-verify-sign": self.aes(t, True),
|
||||
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
|
||||
return header
|
||||
|
||||
def getdata(self, path, data=None):
|
||||
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
|
||||
data1 = self.aes(vdata)
|
||||
return json.loads(data1)
|
||||
|
||||
def Mproxy(self, url):
|
||||
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
|
||||
|
||||
def Mlocal(self, param, header=None):
|
||||
url = self.d64(param["url"])
|
||||
ydata = self.fetch(url, headers=header, allow_redirects=False)
|
||||
data = ydata.content.decode('utf-8')
|
||||
if ydata.headers.get('Location'):
|
||||
url = ydata.headers['Location']
|
||||
data = self.fetch(url, headers=header).content.decode('utf-8')
|
||||
parsed_url = urlparse(url)
|
||||
durl = parsed_url.scheme + "://" + parsed_url.netloc
|
||||
lines = data.strip().split('\n')
|
||||
for index, string in enumerate(lines):
|
||||
if '#EXT' not in string and 'http' not in string:
|
||||
last_slash_index = string.rfind('/')
|
||||
lpath = string[:last_slash_index + 1]
|
||||
lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
|
||||
data = '\n'.join(lines)
|
||||
return [200, "application/vnd.apple.mpegur", data]
|
||||
|
||||
def e64(self, text):
|
||||
try:
|
||||
text_bytes = text.encode('utf-8')
|
||||
encoded_bytes = b64encode(text_bytes)
|
||||
return encoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64编码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def d64(self, encoded_text):
|
||||
try:
|
||||
encoded_bytes = encoded_text.encode('utf-8')
|
||||
decoded_bytes = b64decode(encoded_bytes)
|
||||
return decoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64解码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def md5(self, text):
|
||||
h = MD5.new()
|
||||
h.update(text.encode('utf-8'))
|
||||
return h.hexdigest()
|
219
PY/分享云速 2.py
Normal file
219
PY/分享云速 2.py
Normal file
@ -0,0 +1,219 @@
|
||||
import re
|
||||
import sys
|
||||
from Crypto.Hash import MD5
|
||||
sys.path.append("..")
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from urllib.parse import quote, urlparse
|
||||
from base64 import b64encode, b64decode
|
||||
import json
|
||||
import time
|
||||
from base.spider import Spider
|
||||
|
||||
|
||||
class Spider(Spider):
|
||||
|
||||
def init(self, extend=""):
|
||||
self.host = self.gethost()
|
||||
self.did=self.getdid()
|
||||
pass
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
pass
|
||||
|
||||
def manualVideoCheck(self):
|
||||
pass
|
||||
|
||||
def action(self, action):
|
||||
pass
|
||||
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
def homeContent(self, filter):
|
||||
data = self.getdata("/api.php/getappapi.index/initV119")
|
||||
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
|
||||
"sort": "排序"}
|
||||
filters = {}
|
||||
classes = []
|
||||
json_data = data["type_list"]
|
||||
homedata = data["banner_list"][8:]
|
||||
for item in json_data:
|
||||
if item["type_name"] == "全部":
|
||||
continue
|
||||
has_non_empty_field = False
|
||||
jsontype_extend = json.loads(item["type_extend"])
|
||||
homedata.extend(item["recommend_list"])
|
||||
jsontype_extend["sort"] = "最新,最热,最赞"
|
||||
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
|
||||
for key in dy:
|
||||
if key in jsontype_extend and jsontype_extend[key].strip() != "":
|
||||
has_non_empty_field = True
|
||||
break
|
||||
if has_non_empty_field:
|
||||
filters[str(item["type_id"])] = []
|
||||
for dkey in jsontype_extend:
|
||||
if dkey in dy and jsontype_extend[dkey].strip() != "":
|
||||
values = jsontype_extend[dkey].split(",")
|
||||
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
|
||||
value.strip() != ""]
|
||||
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
|
||||
result = {}
|
||||
result["class"] = classes
|
||||
result["filters"] = filters
|
||||
result["list"] = homedata[1:]
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
pass
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
|
||||
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
|
||||
"class": extend.get('class', '全部')}
|
||||
result = {}
|
||||
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
|
||||
result["list"] = data["recommend_list"]
|
||||
result["page"] = pg
|
||||
result["pagecount"] = 9999
|
||||
result["limit"] = 90
|
||||
result["total"] = 999999
|
||||
return result
|
||||
|
||||
def detailContent(self, ids):
|
||||
body = f"vod_id={ids[0]}"
|
||||
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
|
||||
vod = data["vod"]
|
||||
play = []
|
||||
names = []
|
||||
for itt in data["vod_play_list"]:
|
||||
a = []
|
||||
names.append(itt["player_info"]["show"])
|
||||
for it in itt['urls']:
|
||||
it['user_agent'] = itt["player_info"].get("user_agent")
|
||||
it["parse"] = itt["player_info"].get("parse")
|
||||
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
|
||||
play.append("#".join(a))
|
||||
vod["vod_play_from"] = "$$$".join(names)
|
||||
vod["vod_play_url"] = "$$$".join(play)
|
||||
result = {"list": [vod]}
|
||||
return result
|
||||
|
||||
def searchContent(self, key, quick, pg="1"):
|
||||
body = f"keywords={key}&type_id=0&page={pg}"
|
||||
data = self.getdata("/api.php/getappapi.index/searchList", body)
|
||||
result = {"list": data["search_list"], "page": pg}
|
||||
return result
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
ids = json.loads(self.d64(id))
|
||||
h = {"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
|
||||
try:
|
||||
if re.search(r'url=', ids['parse_api_url']):
|
||||
data = self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
|
||||
url = data.get('url') or data['data'].get('url')
|
||||
else:
|
||||
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'], True))}&token={ids.get('token')}"
|
||||
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
|
||||
url = json.loads(b)['url']
|
||||
if 'error' in url: raise ValueError(f"解析失败: {url}")
|
||||
p = 0
|
||||
except Exception as e:
|
||||
print('错误信息:', e)
|
||||
url, p = ids['url'], 1
|
||||
|
||||
if re.search(r'\.jpg|\.png|\.jpeg', url):
|
||||
url = self.Mproxy(url)
|
||||
result = {}
|
||||
result["parse"] = p
|
||||
result["url"] = url
|
||||
result["header"] = h
|
||||
return result
|
||||
|
||||
def localProxy(self, param):
|
||||
return self.Mlocal(param)
|
||||
|
||||
def gethost(self):
|
||||
headers = {
|
||||
'User-Agent': 'okhttp/3.14.9'
|
||||
}
|
||||
response = self.fetch('https://jingyu-1312635929.cos.ap-nanjing.myqcloud.com/1.json',headers=headers).text
|
||||
return response.strip()
|
||||
|
||||
def getdid(self):
|
||||
did=self.getCache('did')
|
||||
if not did:
|
||||
t = str(int(time.time()))
|
||||
did = self.md5(t)
|
||||
self.setCache('did', did)
|
||||
return did
|
||||
|
||||
def aes(self, text, b=None):
|
||||
key = b"4d83b87c4c5ea111"
|
||||
cipher = AES.new(key, AES.MODE_CBC, key)
|
||||
if b:
|
||||
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
|
||||
ct = b64encode(ct_bytes).decode("utf-8")
|
||||
return ct
|
||||
else:
|
||||
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
|
||||
return pt.decode("utf-8")
|
||||
|
||||
def header(self):
|
||||
t = str(int(time.time()))
|
||||
header = {"Referer": self.host,
|
||||
"User-Agent": "okhttp/3.14.9", "app-version-code": "300", "app-ui-mode": "light",
|
||||
"app-api-verify-time": t, "app-user-device-id": self.did,
|
||||
"app-api-verify-sign": self.aes(t, True),
|
||||
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
|
||||
return header
|
||||
|
||||
def getdata(self, path, data=None):
|
||||
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
|
||||
data1 = self.aes(vdata)
|
||||
return json.loads(data1)
|
||||
|
||||
def Mproxy(self, url):
|
||||
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
|
||||
|
||||
def Mlocal(self, param, header=None):
|
||||
url = self.d64(param["url"])
|
||||
ydata = self.fetch(url, headers=header, allow_redirects=False)
|
||||
data = ydata.content.decode('utf-8')
|
||||
if ydata.headers.get('Location'):
|
||||
url = ydata.headers['Location']
|
||||
data = self.fetch(url, headers=header).content.decode('utf-8')
|
||||
parsed_url = urlparse(url)
|
||||
durl = parsed_url.scheme + "://" + parsed_url.netloc
|
||||
lines = data.strip().split('\n')
|
||||
for index, string in enumerate(lines):
|
||||
if '#EXT' not in string and 'http' not in string:
|
||||
last_slash_index = string.rfind('/')
|
||||
lpath = string[:last_slash_index + 1]
|
||||
lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
|
||||
data = '\n'.join(lines)
|
||||
return [200, "application/vnd.apple.mpegur", data]
|
||||
|
||||
def e64(self, text):
|
||||
try:
|
||||
text_bytes = text.encode('utf-8')
|
||||
encoded_bytes = b64encode(text_bytes)
|
||||
return encoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64编码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def d64(self, encoded_text):
|
||||
try:
|
||||
encoded_bytes = encoded_text.encode('utf-8')
|
||||
decoded_bytes = b64decode(encoded_bytes)
|
||||
return decoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64解码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def md5(self, text):
|
||||
h = MD5.new()
|
||||
h.update(text.encode('utf-8'))
|
||||
return h.hexdigest()
|
||||
|
219
PY/分享云速.py
Normal file
219
PY/分享云速.py
Normal file
@ -0,0 +1,219 @@
|
||||
import re
|
||||
import sys
|
||||
from Crypto.Hash import MD5
|
||||
sys.path.append("..")
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from urllib.parse import quote, urlparse
|
||||
from base64 import b64encode, b64decode
|
||||
import json
|
||||
import time
|
||||
from base.spider import Spider
|
||||
|
||||
|
||||
class Spider(Spider):
|
||||
|
||||
def init(self, extend=""):
|
||||
self.host = self.gethost()
|
||||
self.did=self.getdid()
|
||||
pass
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
pass
|
||||
|
||||
def manualVideoCheck(self):
|
||||
pass
|
||||
|
||||
def action(self, action):
|
||||
pass
|
||||
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
def homeContent(self, filter):
|
||||
data = self.getdata("/api.php/getappapi.index/initV119")
|
||||
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
|
||||
"sort": "排序"}
|
||||
filters = {}
|
||||
classes = []
|
||||
json_data = data["type_list"]
|
||||
homedata = data["banner_list"][8:]
|
||||
for item in json_data:
|
||||
if item["type_name"] == "全部":
|
||||
continue
|
||||
has_non_empty_field = False
|
||||
jsontype_extend = json.loads(item["type_extend"])
|
||||
homedata.extend(item["recommend_list"])
|
||||
jsontype_extend["sort"] = "最新,最热,最赞"
|
||||
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
|
||||
for key in dy:
|
||||
if key in jsontype_extend and jsontype_extend[key].strip() != "":
|
||||
has_non_empty_field = True
|
||||
break
|
||||
if has_non_empty_field:
|
||||
filters[str(item["type_id"])] = []
|
||||
for dkey in jsontype_extend:
|
||||
if dkey in dy and jsontype_extend[dkey].strip() != "":
|
||||
values = jsontype_extend[dkey].split(",")
|
||||
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
|
||||
value.strip() != ""]
|
||||
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
|
||||
result = {}
|
||||
result["class"] = classes
|
||||
result["filters"] = filters
|
||||
result["list"] = homedata[1:]
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
pass
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
|
||||
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
|
||||
"class": extend.get('class', '全部')}
|
||||
result = {}
|
||||
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
|
||||
result["list"] = data["recommend_list"]
|
||||
result["page"] = pg
|
||||
result["pagecount"] = 9999
|
||||
result["limit"] = 90
|
||||
result["total"] = 999999
|
||||
return result
|
||||
|
||||
def detailContent(self, ids):
|
||||
body = f"vod_id={ids[0]}"
|
||||
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
|
||||
vod = data["vod"]
|
||||
play = []
|
||||
names = []
|
||||
for itt in data["vod_play_list"]:
|
||||
a = []
|
||||
names.append(itt["player_info"]["show"])
|
||||
for it in itt['urls']:
|
||||
it['user_agent'] = itt["player_info"].get("user_agent")
|
||||
it["parse"] = itt["player_info"].get("parse")
|
||||
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
|
||||
play.append("#".join(a))
|
||||
vod["vod_play_from"] = "$$$".join(names)
|
||||
vod["vod_play_url"] = "$$$".join(play)
|
||||
result = {"list": [vod]}
|
||||
return result
|
||||
|
||||
def searchContent(self, key, quick, pg="1"):
|
||||
body = f"keywords={key}&type_id=0&page={pg}"
|
||||
data = self.getdata("/api.php/getappapi.index/searchList", body)
|
||||
result = {"list": data["search_list"], "page": pg}
|
||||
return result
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
ids = json.loads(self.d64(id))
|
||||
h = {"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
|
||||
try:
|
||||
if re.search(r'url=', ids['parse_api_url']):
|
||||
data = self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
|
||||
url = data.get('url') or data['data'].get('url')
|
||||
else:
|
||||
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'], True))}&token={ids.get('token')}"
|
||||
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
|
||||
url = json.loads(b)['url']
|
||||
if 'error' in url: raise ValueError(f"解析失败: {url}")
|
||||
p = 0
|
||||
except Exception as e:
|
||||
print('错误信息:', e)
|
||||
url, p = ids['url'], 1
|
||||
|
||||
if re.search(r'\.jpg|\.png|\.jpeg', url):
|
||||
url = self.Mproxy(url)
|
||||
result = {}
|
||||
result["parse"] = p
|
||||
result["url"] = url
|
||||
result["header"] = h
|
||||
return result
|
||||
|
||||
def localProxy(self, param):
|
||||
return self.Mlocal(param)
|
||||
|
||||
def gethost(self):
|
||||
headers = {
|
||||
'User-Agent': 'okhttp/3.14.9'
|
||||
}
|
||||
response = self.fetch('https://jingyu-1312635929.cos.ap-nanjing.myqcloud.com/1.json',headers=headers).text
|
||||
return response.strip()
|
||||
|
||||
def getdid(self):
|
||||
did=self.getCache('did')
|
||||
if not did:
|
||||
t = str(int(time.time()))
|
||||
did = self.md5(t)
|
||||
self.setCache('did', did)
|
||||
return did
|
||||
|
||||
def aes(self, text, b=None):
|
||||
key = b"4d83b87c4c5ea111"
|
||||
cipher = AES.new(key, AES.MODE_CBC, key)
|
||||
if b:
|
||||
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
|
||||
ct = b64encode(ct_bytes).decode("utf-8")
|
||||
return ct
|
||||
else:
|
||||
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
|
||||
return pt.decode("utf-8")
|
||||
|
||||
def header(self):
|
||||
t = str(int(time.time()))
|
||||
header = {"Referer": self.host,
|
||||
"User-Agent": "okhttp/3.14.9", "app-version-code": "300", "app-ui-mode": "light",
|
||||
"app-api-verify-time": t, "app-user-device-id": self.did,
|
||||
"app-api-verify-sign": self.aes(t, True),
|
||||
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
|
||||
return header
|
||||
|
||||
def getdata(self, path, data=None):
|
||||
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
|
||||
data1 = self.aes(vdata)
|
||||
return json.loads(data1)
|
||||
|
||||
def Mproxy(self, url):
|
||||
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
|
||||
|
||||
def Mlocal(self, param, header=None):
|
||||
url = self.d64(param["url"])
|
||||
ydata = self.fetch(url, headers=header, allow_redirects=False)
|
||||
data = ydata.content.decode('utf-8')
|
||||
if ydata.headers.get('Location'):
|
||||
url = ydata.headers['Location']
|
||||
data = self.fetch(url, headers=header).content.decode('utf-8')
|
||||
parsed_url = urlparse(url)
|
||||
durl = parsed_url.scheme + "://" + parsed_url.netloc
|
||||
lines = data.strip().split('\n')
|
||||
for index, string in enumerate(lines):
|
||||
if '#EXT' not in string and 'http' not in string:
|
||||
last_slash_index = string.rfind('/')
|
||||
lpath = string[:last_slash_index + 1]
|
||||
lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
|
||||
data = '\n'.join(lines)
|
||||
return [200, "application/vnd.apple.mpegur", data]
|
||||
|
||||
def e64(self, text):
|
||||
try:
|
||||
text_bytes = text.encode('utf-8')
|
||||
encoded_bytes = b64encode(text_bytes)
|
||||
return encoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64编码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def d64(self, encoded_text):
|
||||
try:
|
||||
encoded_bytes = encoded_text.encode('utf-8')
|
||||
decoded_bytes = b64decode(encoded_bytes)
|
||||
return decoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64解码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def md5(self, text):
|
||||
h = MD5.new()
|
||||
h.update(text.encode('utf-8'))
|
||||
return h.hexdigest()
|
||||
|
255
PY/分享惜弱.py
Normal file
255
PY/分享惜弱.py
Normal file
@ -0,0 +1,255 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# by @嗷呜
|
||||
import re
|
||||
import sys
|
||||
from Crypto.Hash import MD5
|
||||
sys.path.append("..")
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from urllib.parse import quote, urlparse
|
||||
from base64 import b64encode, b64decode
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import json
|
||||
import time
|
||||
from base.spider import Spider
|
||||
|
||||
class Spider(Spider):
|
||||
|
||||
'''
|
||||
sites照常配置,
|
||||
lives配置:
|
||||
{
|
||||
"name": "xxxx",
|
||||
"type": 3,
|
||||
"api": "路径/若惜追剧APP.py",
|
||||
"ext": ""
|
||||
}
|
||||
'''
|
||||
|
||||
def init(self, extend=""):
|
||||
self.host = self.gethost()
|
||||
pass
|
||||
|
||||
def getName(self):
|
||||
pass
|
||||
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
pass
|
||||
|
||||
def manualVideoCheck(self):
|
||||
pass
|
||||
|
||||
def action(self, action):
|
||||
pass
|
||||
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
def homeContent(self, filter):
|
||||
data = self.getdata("/api.php/getappapi.index/initV119")
|
||||
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
|
||||
"sort": "排序"}
|
||||
filters = {}
|
||||
classes = []
|
||||
json_data = data["type_list"]
|
||||
homedata = data["banner_list"][8:]
|
||||
for item in json_data:
|
||||
if item["type_name"] == "全部":
|
||||
continue
|
||||
has_non_empty_field = False
|
||||
jsontype_extend = json.loads(item["type_extend"])
|
||||
homedata.extend(item["recommend_list"])
|
||||
jsontype_extend["sort"] = "最新,最热,最赞"
|
||||
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
|
||||
for key in dy:
|
||||
if key in jsontype_extend and jsontype_extend[key].strip() != "":
|
||||
has_non_empty_field = True
|
||||
break
|
||||
if has_non_empty_field:
|
||||
filters[str(item["type_id"])] = []
|
||||
for dkey in jsontype_extend:
|
||||
if dkey in dy and jsontype_extend[dkey].strip() != "":
|
||||
values = jsontype_extend[dkey].split(",")
|
||||
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
|
||||
value.strip() != ""]
|
||||
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
|
||||
result = {}
|
||||
result["class"] = classes
|
||||
result["filters"] = filters
|
||||
result["list"] = homedata[1:]
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
pass
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
|
||||
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
|
||||
"class": extend.get('class', '全部')}
|
||||
result = {}
|
||||
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
|
||||
result["list"] = data["recommend_list"]
|
||||
result["page"] = pg
|
||||
result["pagecount"] = 9999
|
||||
result["limit"] = 90
|
||||
result["total"] = 999999
|
||||
return result
|
||||
|
||||
def detailContent(self, ids):
|
||||
body = f"vod_id={ids[0]}"
|
||||
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
|
||||
vod = data["vod"]
|
||||
play = []
|
||||
names = []
|
||||
for itt in data["vod_play_list"]:
|
||||
a = []
|
||||
names.append(itt["player_info"]["show"])
|
||||
for it in itt['urls']:
|
||||
it['user_agent']=itt["player_info"].get("user_agent")
|
||||
it["parse"]=itt["player_info"].get("parse")
|
||||
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
|
||||
play.append("#".join(a))
|
||||
vod["vod_play_from"] = "$$$".join(names)
|
||||
vod["vod_play_url"] = "$$$".join(play)
|
||||
result = {"list": [vod]}
|
||||
return result
|
||||
|
||||
def searchContent(self, key, quick, pg="1"):
|
||||
body = f"keywords={key}&type_id=0&page={pg}"
|
||||
data = self.getdata("/api.php/getappapi.index/searchList", body)
|
||||
result = {"list": data["search_list"], "page": pg}
|
||||
return result
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
ids = json.loads(self.d64(id))
|
||||
h={"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
|
||||
try:
|
||||
if re.search(r'url=', ids['parse_api_url']):
|
||||
data=self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
|
||||
url=data.get('url') or data['data'].get('url')
|
||||
else:
|
||||
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'],True))}&token={ids.get('token')}"
|
||||
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
|
||||
url = json.loads(b)['url']
|
||||
if 'error' in url:raise ValueError(f"解析失败: {url}")
|
||||
p=0
|
||||
except Exception as e:
|
||||
print('错误信息:',e)
|
||||
url, p = ids['url'], 1
|
||||
|
||||
if re.search(r'\.jpg|\.png|\.jpeg', url):
|
||||
url = self.Mproxy(url)
|
||||
result = {}
|
||||
result["parse"] = p
|
||||
result["url"] = url
|
||||
result["header"] = h
|
||||
return result
|
||||
|
||||
def liveContent(self, url):
|
||||
id=self.homeContent(True)['class'][-1]['type_id']
|
||||
vlist=self.categoryContent(id,1,False,{})['list']
|
||||
results = []
|
||||
with ThreadPoolExecutor(max_workers=len(vlist)) as executor:
|
||||
futures = [executor.submit(self.livedetailContent, item['vod_name'], item['vod_id']) for item in vlist]
|
||||
for future in futures:
|
||||
try:
|
||||
detail = future.result()
|
||||
if detail:
|
||||
results.append(detail)
|
||||
except Exception as e:
|
||||
print(f"处理详情数据失败: {str(e)}")
|
||||
return '\n'.join(results)
|
||||
|
||||
def livedetailContent(self, name,id):
|
||||
try:
|
||||
print(f"获取直播源:{name}")
|
||||
body = f"vod_id={id}"
|
||||
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
|
||||
play = [f"{name},#genre#"]
|
||||
for itt in data["vod_play_list"]:
|
||||
for it in itt['urls']:
|
||||
play.append(f"{it['name']}, {it['url']}")
|
||||
except Exception as e:
|
||||
print(f"获取直播源失败:{str(e)}")
|
||||
play=[]
|
||||
return '\n'.join(play)
|
||||
|
||||
def localProxy(self, param):
|
||||
return self.Mlocal(param)
|
||||
|
||||
def gethost(self):
|
||||
headers = {
|
||||
'User-Agent': 'okhttp/3.14.9'
|
||||
}
|
||||
host = self.fetch('https://rxysyyds.oss-cn-chengdu.aliyuncs.com/getapp.txt', headers=headers).text
|
||||
return host.strip()
|
||||
|
||||
def aes(self, text,b=None):
|
||||
key = b"ebad3f1a58b13933"
|
||||
cipher = AES.new(key, AES.MODE_CBC, key)
|
||||
if b:
|
||||
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
|
||||
ct = b64encode(ct_bytes).decode("utf-8")
|
||||
return ct
|
||||
else :
|
||||
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
|
||||
return pt.decode("utf-8")
|
||||
|
||||
def header(self):
|
||||
t = str(int(time.time()))
|
||||
header = {"Referer":self.host,
|
||||
"User-Agent": "okhttp/3.14.9", "app-version-code": "140", "app-ui-mode": "light",
|
||||
"app-api-verify-time": t, "app-user-device-id": self.md5(t),
|
||||
"app-api-verify-sign": self.aes(t,True),
|
||||
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
|
||||
return header
|
||||
|
||||
def getdata(self, path, data=None):
|
||||
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
|
||||
data1 = self.aes(vdata)
|
||||
return json.loads(data1)
|
||||
|
||||
def Mproxy(self, url):
|
||||
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
|
||||
|
||||
def Mlocal(self, param,header=None):
|
||||
url = self.d64(param["url"])
|
||||
ydata = self.fetch(url, headers=header, allow_redirects=False)
|
||||
data = ydata.content.decode('utf-8')
|
||||
if ydata.headers.get('Location'):
|
||||
url = ydata.headers['Location']
|
||||
data = self.fetch(url, headers=header).content.decode('utf-8')
|
||||
parsed_url = urlparse(url)
|
||||
durl = parsed_url.scheme + "://" + parsed_url.netloc
|
||||
lines = data.strip().split('\n')
|
||||
for index, string in enumerate(lines):
|
||||
if '#EXT' not in string and 'http' not in string:
|
||||
last_slash_index = string.rfind('/')
|
||||
lpath = string[:last_slash_index + 1]
|
||||
lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
|
||||
data = '\n'.join(lines)
|
||||
return [200, "application/vnd.apple.mpegur", data]
|
||||
|
||||
def e64(self, text):
|
||||
try:
|
||||
text_bytes = text.encode('utf-8')
|
||||
encoded_bytes = b64encode(text_bytes)
|
||||
return encoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64编码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def d64(self,encoded_text):
|
||||
try:
|
||||
encoded_bytes = encoded_text.encode('utf-8')
|
||||
decoded_bytes = b64decode(encoded_bytes)
|
||||
return decoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64解码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def md5(self, text):
|
||||
h = MD5.new()
|
||||
h.update(text.encode('utf-8'))
|
||||
return h.hexdigest()
|
391
PY/分享蓝莓.py
Normal file
391
PY/分享蓝莓.py
Normal file
@ -0,0 +1,391 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
import sys
|
||||
import urllib.parse
|
||||
|
||||
sys.path.append('../../')
|
||||
try:
|
||||
from base.spider import Spider
|
||||
except ImportError:
|
||||
# 定义一个基础接口类,用于本地测试
|
||||
class Spider:
|
||||
def init(self, extend=""):
|
||||
pass
|
||||
|
||||
class Spider(Spider):
|
||||
def __init__(self):
|
||||
self.siteUrl = "https://app.whjzjx.cn"
|
||||
# 分类ID映射
|
||||
self.cateManual = {
|
||||
"古装": "5",
|
||||
"穿越": "17",
|
||||
"逆袭": "7",
|
||||
"重生": "6"
|
||||
}
|
||||
# 请求头
|
||||
self.headers = {
|
||||
"Connection": "keep-alive",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"user-agent": "okhttp/4.10.0",
|
||||
"user_agent": "Mozilla/5.0 (Linux; Android 9; ASUS_I003DD Build/PI; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/68.0.3440.70 Mobile Safari/537.36",
|
||||
"Host": "app.whjzjx.cn",
|
||||
"Accept-Encoding": "gzip"
|
||||
}
|
||||
# token缓存
|
||||
self.token = None
|
||||
self.tokenExpireTime = 0
|
||||
|
||||
def getName(self):
|
||||
# 返回爬虫名称
|
||||
return "蓝莓短剧"
|
||||
|
||||
def init(self, extend=""):
|
||||
return
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
# 检查是否为视频格式
|
||||
video_formats = ['.mp4', '.m3u8', '.ts']
|
||||
for format in video_formats:
|
||||
if format in url.lower():
|
||||
return True
|
||||
return False
|
||||
|
||||
def manualVideoCheck(self):
|
||||
# 不需要手动检查
|
||||
return False
|
||||
|
||||
def getToken(self):
|
||||
"""获取API访问Token"""
|
||||
# 如果token有效期内,直接返回
|
||||
current_time = time.time()
|
||||
if self.token and current_time < self.tokenExpireTime:
|
||||
return self.token
|
||||
|
||||
# 否则重新获取
|
||||
try:
|
||||
tkurl = 'https://app.whjzjx.cn/v1/account/login'
|
||||
body = "device=20caaae96b3443174bf4ebdbdcc253776"
|
||||
|
||||
response = requests.post(
|
||||
tkurl,
|
||||
headers=self.headers,
|
||||
data=body
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
json_data = response.json()
|
||||
# 修复:服务器返回的是"ok"而不是0
|
||||
if json_data.get('code') == 0 or json_data.get('code') == "ok" or json_data.get('status') == 0:
|
||||
self.token = json_data['data']['token']
|
||||
# 设置token过期时间为1小时
|
||||
self.tokenExpireTime = current_time + 3600
|
||||
return self.token
|
||||
|
||||
print(f"获取token失败: {response.text}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"获取token异常: {str(e)}")
|
||||
return None
|
||||
|
||||
def fetchWithToken(self, url, method="GET", body=None):
|
||||
"""带token的网络请求"""
|
||||
token = self.getToken()
|
||||
if not token:
|
||||
print("无法获取token")
|
||||
return None
|
||||
|
||||
headers = self.headers.copy()
|
||||
headers["authorization"] = token
|
||||
|
||||
try:
|
||||
if method.upper() == "GET":
|
||||
response = requests.get(url, headers=headers, timeout=10)
|
||||
else: # POST
|
||||
response = requests.post(url, headers=headers, data=body, timeout=10)
|
||||
|
||||
response.raise_for_status()
|
||||
return response
|
||||
except Exception as e:
|
||||
print(f"请求失败: {url}, 错误: {str(e)}")
|
||||
return None
|
||||
|
||||
def homeContent(self, filter):
|
||||
"""获取首页分类及筛选"""
|
||||
result = {}
|
||||
classes = []
|
||||
|
||||
# 添加分类
|
||||
for k in self.cateManual:
|
||||
classes.append({
|
||||
'type_id': self.cateManual[k],
|
||||
'type_name': k
|
||||
})
|
||||
|
||||
result['class'] = classes
|
||||
|
||||
# 获取首页推荐视频
|
||||
try:
|
||||
result['list'] = self.homeVideoContent()['list']
|
||||
except:
|
||||
result['list'] = []
|
||||
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
"""获取首页推荐视频内容"""
|
||||
# 使用第一个分类的内容作为首页推荐
|
||||
first_cate = list(self.cateManual.values())[0]
|
||||
result = self.categoryContent(first_cate, 1, False, None)
|
||||
# 不打印错误信息,除非列表为空
|
||||
if not result.get('list'):
|
||||
print("未获取到首页推荐视频")
|
||||
return result
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
"""获取分类内容"""
|
||||
result = {}
|
||||
videos = []
|
||||
|
||||
try:
|
||||
# 构建请求URL:分类页
|
||||
url = f"{self.siteUrl}/v1/theater/home_page?theater_class_id={tid}&page_num={int(pg)-1}&page_size=24"
|
||||
|
||||
response = self.fetchWithToken(url)
|
||||
if not response:
|
||||
return result
|
||||
|
||||
json_data = response.json()
|
||||
|
||||
# 服务器正常响应状态检查,返回"ok"或status=0认为是成功
|
||||
if not(json_data.get('code') == 0 or json_data.get('code') == "ok" or json_data.get('status') == 0):
|
||||
print(f"获取分类数据失败: {json_data}")
|
||||
return result
|
||||
|
||||
# 不再打印json_data,而是处理正常返回的数据
|
||||
# 解析视频列表
|
||||
data_list = json_data.get('data', {}).get('list', [])
|
||||
for item in data_list:
|
||||
theater = item.get('theater', {})
|
||||
if not theater:
|
||||
continue
|
||||
|
||||
video_id = theater.get('id')
|
||||
title = theater.get('title')
|
||||
cover = theater.get('cover_url')
|
||||
total = theater.get('total', '')
|
||||
play_amount = theater.get('play_amount_str', '')
|
||||
|
||||
videos.append({
|
||||
"vod_id": video_id,
|
||||
"vod_name": title,
|
||||
"vod_pic": cover,
|
||||
"vod_remarks": f"{total}集",
|
||||
"vod_content": f"播放量:{play_amount}"
|
||||
})
|
||||
|
||||
# 构建返回结果
|
||||
result = {
|
||||
'list': videos,
|
||||
'page': pg,
|
||||
'pagecount': 9999, # 假设有很多页
|
||||
'limit': 24,
|
||||
'total': 999999 # 设置一个较大数值
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"获取分类内容异常: {str(e)}")
|
||||
|
||||
return result
|
||||
|
||||
def detailContent(self, ids):
|
||||
"""获取详情页内容"""
|
||||
video_id = ids[0]
|
||||
result = {}
|
||||
|
||||
try:
|
||||
# 构建详情页请求URL
|
||||
url = f"{self.siteUrl}/v2/theater_parent/detail?theater_parent_id={video_id}"
|
||||
|
||||
response = self.fetchWithToken(url)
|
||||
if not response:
|
||||
return {}
|
||||
|
||||
json_data = response.json()
|
||||
if not(json_data.get('code') == 0 or json_data.get('code') == "ok" or json_data.get('status') == 0):
|
||||
print(f"获取详情数据失败: {json_data}")
|
||||
return {}
|
||||
|
||||
# 解析详情数据
|
||||
data = json_data.get('data', {})
|
||||
title = data.get('title', '')
|
||||
cover = data.get('cover_url', '')
|
||||
total = data.get('total', '')
|
||||
|
||||
# 提取剧集列表
|
||||
theaters = data.get('theaters', [])
|
||||
episodes = []
|
||||
|
||||
for index, theater in enumerate(theaters):
|
||||
ep_name = f"第{theater.get('num', '')}集"
|
||||
# 生成格式为 video_id_episode_index 的ID,方便playerContent提取
|
||||
ep_url = f"{video_id}_{index}"
|
||||
episodes.append(f"{ep_name}${ep_url}")
|
||||
|
||||
# 构建VOD数据
|
||||
vod = {
|
||||
"vod_id": video_id,
|
||||
"vod_name": title,
|
||||
"vod_pic": cover,
|
||||
"vod_remarks": f"{total}集",
|
||||
"vod_content": data.get('introduction', ''),
|
||||
"vod_play_from": "蓝莓短剧",
|
||||
"vod_play_url": "#".join(episodes)
|
||||
}
|
||||
|
||||
result = {
|
||||
'list': [vod]
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"获取详情内容异常: {str(e)}")
|
||||
|
||||
return result
|
||||
|
||||
def searchContent(self, key, quick, pg=1):
|
||||
"""搜索功能"""
|
||||
result = {}
|
||||
videos = []
|
||||
|
||||
try:
|
||||
# 构建搜索请求
|
||||
url = f"{self.siteUrl}/v2/search"
|
||||
body = f"text={urllib.parse.quote(key)}"
|
||||
|
||||
response = self.fetchWithToken(url, method="POST", body=body)
|
||||
if not response:
|
||||
return {}
|
||||
|
||||
json_data = response.json()
|
||||
# 修改这里,使用与detailContent相同的条件判断
|
||||
if not(json_data.get('code') == 0 or json_data.get('code') == "ok" or json_data.get('status') == 0):
|
||||
print(f"搜索数据失败: {json_data}")
|
||||
return {}
|
||||
|
||||
# 解析搜索结果
|
||||
search_data = json_data.get('data', {}).get('search_data', [])
|
||||
for item in search_data:
|
||||
video_id = item.get('id')
|
||||
title = item.get('title')
|
||||
cover = item.get('cover_url')
|
||||
score = item.get('score_str', '')
|
||||
total = item.get('total', '')
|
||||
|
||||
videos.append({
|
||||
"vod_id": video_id,
|
||||
"vod_name": title,
|
||||
"vod_pic": cover,
|
||||
"vod_remarks": f"{score}|{total}集"
|
||||
})
|
||||
|
||||
result = {
|
||||
'list': videos,
|
||||
'page': pg
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"搜索内容异常: {str(e)}")
|
||||
|
||||
print(11111111, result)
|
||||
return result
|
||||
|
||||
def searchContentPage(self, key, quick, pg=1):
|
||||
return self.searchContent(key, quick, pg)
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
"""获取播放内容"""
|
||||
result = {}
|
||||
|
||||
# 检查是否已经是直接的视频URL
|
||||
if self.isVideoFormat(id):
|
||||
result["parse"] = 0
|
||||
result["url"] = id
|
||||
result["playUrl"] = ""
|
||||
result["header"] = json.dumps(self.headers)
|
||||
return result
|
||||
|
||||
# 如果不是直接的视频URL,需要处理一下
|
||||
try:
|
||||
# 我们需要从ID中解析出剧ID和集索引
|
||||
if id.isdigit():
|
||||
# 如果是纯数字ID,说明是剧ID,我们需要获取详情并提取第一集
|
||||
video_id = id
|
||||
ep_index = 0 # 默认获取第一集
|
||||
elif '_' in id:
|
||||
# 如果ID包含下划线,格式是 video_id_episode_index
|
||||
parts = id.split('_')
|
||||
if len(parts) >= 2:
|
||||
video_id = parts[0] # 这是纯数字的视频ID
|
||||
ep_index = int(parts[1])
|
||||
else:
|
||||
video_id = id
|
||||
ep_index = 0
|
||||
else:
|
||||
# 假设id就是视频URL
|
||||
result["parse"] = 0
|
||||
result["url"] = id
|
||||
result["playUrl"] = ""
|
||||
result["header"] = json.dumps(self.headers)
|
||||
return result
|
||||
|
||||
# 获取详情数据,通过详情接口获取剧集列表
|
||||
# 确保只使用纯数字的视频ID作为theater_parent_id参数
|
||||
detail_url = f"{self.siteUrl}/v2/theater_parent/detail?theater_parent_id={video_id}"
|
||||
print(f"请求详情URL: {detail_url}")
|
||||
detail_response = self.fetchWithToken(detail_url)
|
||||
|
||||
if not detail_response or detail_response.status_code != 200:
|
||||
print("获取详情数据失败")
|
||||
return result
|
||||
|
||||
detail_json = detail_response.json()
|
||||
# 修改这里,使用与detailContent相同的条件判断
|
||||
if not(detail_json.get('code') == 0 or detail_json.get('code') == "ok" or detail_json.get('status') == 0):
|
||||
print(f"获取详情数据错误: {detail_json}")
|
||||
return result
|
||||
|
||||
# 获取剧集列表
|
||||
theaters = detail_json.get('data', {}).get('theaters', [])
|
||||
|
||||
if not theaters or ep_index >= len(theaters):
|
||||
print(f"未找到剧集或索引超出范围: {ep_index}")
|
||||
return result
|
||||
|
||||
# 获取指定索引的剧集
|
||||
episode = theaters[ep_index]
|
||||
video_url = episode.get('son_video_url', '')
|
||||
|
||||
if not video_url:
|
||||
print(f"未找到视频URL")
|
||||
return result
|
||||
|
||||
# 添加播放所需的headers
|
||||
play_headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
||||
"Referer": "http://qcapp.xingya.com.cn/"
|
||||
}
|
||||
|
||||
# 返回播放信息
|
||||
result["parse"] = 0
|
||||
result["url"] = video_url
|
||||
result["playUrl"] = ""
|
||||
result["header"] = json.dumps(play_headers)
|
||||
|
||||
except Exception as e:
|
||||
print(f"获取播放内容异常: {str(e)}")
|
||||
import traceback
|
||||
print(traceback.format_exc())
|
||||
|
||||
return result
|
||||
|
||||
def localProxy(self, param):
|
||||
"""本地代理处理,此处简单返回传入的参数"""
|
||||
return [200, "video/MP2T", {}, param]
|
340
PY/分享边缘.py
Normal file
340
PY/分享边缘.py
Normal file
@ -0,0 +1,340 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# by @嗷呜
|
||||
import binascii
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
from urllib.parse import urlparse
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
sys.path.append('..')
|
||||
from base.spider import Spider
|
||||
from base64 import b64encode, b64decode
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Cipher import AES, PKCS1_v1_5
|
||||
from Crypto.Util.Padding import unpad, pad
|
||||
from Crypto.Hash import MD5
|
||||
|
||||
|
||||
class Spider(Spider):
|
||||
|
||||
def init(self, extend=""):
|
||||
self.host = self.gethost()
|
||||
pass
|
||||
|
||||
def getName(self):
|
||||
pass
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
pass
|
||||
|
||||
def manualVideoCheck(self):
|
||||
pass
|
||||
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
headers = {
|
||||
'AppID': '534',
|
||||
'app_id': '534',
|
||||
'version': '1.0.3',
|
||||
'package': 'com.hjmore.wallpaper',
|
||||
'user_id': '3507f394e83d2424',
|
||||
'user-id': '3507f394e83d2424',
|
||||
'app_name': 'lanlan',
|
||||
'app-name': 'lanlan',
|
||||
'Content-Type': 'application/json; charset=utf-8;',
|
||||
'User-Agent': 'okhttp/4.9.0'
|
||||
}
|
||||
|
||||
def homeContent(self, filter):
|
||||
hdata=self.getdata('/api.php/provide/index',self.getbody({'tid':'0'}))
|
||||
vlist=hdata['data'].get('tj',[])
|
||||
result = {}
|
||||
classes = []
|
||||
filters = {}
|
||||
for i in hdata['data']['sub_data']:
|
||||
id=str(i['type_id'])
|
||||
classes.append({'type_id': id, 'type_name': i['type_name']})
|
||||
if len(i['data']):
|
||||
vlist.extend(i['data'])
|
||||
with ThreadPoolExecutor(max_workers=len(classes)) as executor:
|
||||
results = executor.map(self.getf, classes)
|
||||
for id, ft in results:
|
||||
if len(ft):filters[id] = ft
|
||||
result['class'] = classes
|
||||
result['filters'] = filters
|
||||
result['list'] = vlist
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
pass
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
body={
|
||||
"tid": tid,
|
||||
"type": extend.get('type'),
|
||||
"lang": extend.get('lang'),
|
||||
"area": extend.get('area'),
|
||||
"year": extend.get('year'),
|
||||
"pg": pg
|
||||
}
|
||||
body = {k: v for k, v in body.items() if v is not None and v != ""}
|
||||
data=self.getdata('/api.php/provide/nav',self.getbody(body))
|
||||
result = {}
|
||||
result['list'] = data['data']['data']
|
||||
result['page'] = pg
|
||||
result['pagecount'] = 9999
|
||||
result['limit'] = 90
|
||||
result['total'] = 999999
|
||||
return result
|
||||
pass
|
||||
|
||||
def detailContent(self, ids):
|
||||
data=self.getdata('/api.php/provide/vod',self.getbody({'ids':ids[0]}))
|
||||
vod=data['data']
|
||||
plist=[]
|
||||
names=[]
|
||||
for i in vod['vod_play_url']:
|
||||
ulist=[]
|
||||
names.append(i['name'].split(' ')[0])
|
||||
jdata={'parse':''}
|
||||
if i.get('parse') and isinstance(i['parse'], list) and len(i['parse']):
|
||||
jdata['parse']=self.e64(json.dumps(i['parse']))
|
||||
for j in i['data']:
|
||||
jdata['url']=j['url']
|
||||
ulist.append(f'{j["name"]}${self.e64(json.dumps(jdata))}')
|
||||
plist.append('#'.join(ulist))
|
||||
vod['vod_play_from']='$$$'.join(names)
|
||||
vod['vod_play_url']='$$$'.join(plist)
|
||||
vod.pop('cover_list', None)
|
||||
return {'list':[vod]}
|
||||
|
||||
def searchContent(self, key, quick, pg="1"):
|
||||
body={"wd":key,"tid":"0","pg":pg}
|
||||
data=self.getdata('/api.php/provide/search',self.getbody(body))
|
||||
vlist=[]
|
||||
for i in data['data']:
|
||||
i.pop('vod_play_from', None)
|
||||
vlist.append(i)
|
||||
return {'list':vlist,'page':pg}
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
data=json.loads(self.d64(id))
|
||||
parse=data.get('parse')
|
||||
url,p,head = data.get('url'),1,''
|
||||
if parse:
|
||||
parse=json.loads(self.d64(parse))
|
||||
if not re.search(r'\.m3u8|.mp4|\.flv', url) and parse:
|
||||
for p in parse:
|
||||
try:
|
||||
data=self.fetch(f'{p}{url}',self.headers).json()
|
||||
url=data.get('data',{}).get('url') or data.get('url')
|
||||
head=data.get('data',{}).get('header') or data.get('header')
|
||||
p=0
|
||||
break
|
||||
except:
|
||||
p,url=1,data.get('url')
|
||||
head = {'User-Agent': 'okhttp/4.9.0'}
|
||||
return {'parse': p, 'url': url, 'header': head}
|
||||
|
||||
def localProxy(self, param):
|
||||
pass
|
||||
|
||||
def getf(self, map):
|
||||
ft,id =[], map['type_id']
|
||||
try:
|
||||
fdata = self.getdata('/api.php/provide/nav', self.getbody({'tid': id, 'pg': '1'}))
|
||||
dy = ['area', 'year', 'lang', 'type']
|
||||
fd = fdata['data']['type_extend']
|
||||
has_non_empty_field = False
|
||||
for key in dy:
|
||||
if key in fd and fd[key].strip() != "":
|
||||
has_non_empty_field = True
|
||||
break
|
||||
if has_non_empty_field:
|
||||
for dkey in fd:
|
||||
if dkey in dy and fd[dkey].strip() != "":
|
||||
values = fd[dkey].split(",")
|
||||
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
|
||||
value.strip() != ""]
|
||||
ft.append({"key": dkey, "name": dkey, "value": value_array})
|
||||
return (id, ft)
|
||||
except:
|
||||
return (id, ft)
|
||||
|
||||
def getskey(self):
|
||||
random_bytes = os.urandom(16)
|
||||
return binascii.hexlify(random_bytes).decode()
|
||||
|
||||
def getohost(self):
|
||||
url='https://bianyuan001.oss-cn-beijing.aliyuncs.com/huidu1.0.0.json'
|
||||
response = self.fetch(url, headers=self.headers).json()
|
||||
return response['servers'][0]
|
||||
|
||||
def gethost(self):
|
||||
body={
|
||||
"gr_rp_size": "1080*2272",
|
||||
"gr_app_list": "%E5%B1%8F%E5%B9%95%E5%BD%95%E5%88%B6%EF%BC%88com.miui.screenrecorder%29%0A%E5%A4%B8%E5%85%8B%EF%BC%88com.quark.browser%29%0A%E8%BE%B9%E7%BC%98%E8%A7%86%E9%A2%91%EF%BC%88com.hjmore.wallpaper%29%0A%E5%93%94%E5%93%A9%E5%93%94%E5%93%A9%EF%BC%88tv.danmaku.bili%29%0A%E7%81%AB%E6%98%9F%E6%90%9C%E9%A2%98%EF%BC%88com.fenbi.android.souti%29%0A%E6%94%AF%E4%BB%98%E5%AE%9D%EF%BC%88com.eg.android.AlipayGphone%29%0AWPS%20Office%EF%BC%88cn.wps.moffice_eng%29",
|
||||
"gr_lal": "0.0%2C0.0",
|
||||
"gr_system_type": "android",
|
||||
"gr_device_imei": "3507f394e83d2424",
|
||||
"gr_app_version": "1.0.3",
|
||||
"gr_device_model": "Xiaomi%20M2012K10C%20%28Android%20%E7%89%88%E6%9C%AC%3A%2011%2C%20SDK%E7%89%88%E6%9C%AC%3A%2030%29",
|
||||
"gr_city": "%E8%B4%B5%E5%B7%9E%2C%E6%9C%AA%E7%9F%A5%2C%E6%9C%AA%E7%9F%A5",
|
||||
"requestId": self.uuid(),
|
||||
"timeStamp": str(int(time.time() * 1000)),
|
||||
"version": "1.0.3",
|
||||
"package": "com.hjmore.wallpaper",
|
||||
"userLoginToken": "",
|
||||
"app_id": "534",
|
||||
"appName": 2131951658,
|
||||
"device_id": "3507f394e83d2424",
|
||||
"device-id": "3507f394e83d2424",
|
||||
"oaid": "",
|
||||
"imei": "",
|
||||
"referer_shop": "边缘影视",
|
||||
"referer-shop": "边缘影视",
|
||||
"access_fine_location": 0,
|
||||
"access-fine-location": 0
|
||||
}
|
||||
ohost = self.getohost()
|
||||
data=self.getdata(f'/api.php/settings/grayscale_list',body,ohost)
|
||||
parsed_url = urlparse(data['data']['grayscale']['server_url'][0])
|
||||
domain = parsed_url.scheme + "://" + parsed_url.netloc
|
||||
return domain
|
||||
|
||||
def drsa(self, encrypted_data):
|
||||
private_key_pem = """-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDA5NWiAwRjH50/
|
||||
IJY1N0zLopa4jpuWE7kWMn1Qunu6SjBgTvNRmRUoPDHn54haLfbfXIa2X+/sIaMB
|
||||
/O3HhrpVsz55E5W2vpZ5fBYWh+M65bQERKTW+l72H7GR9x0yj3QPByzzfsj/QkyP
|
||||
81prpwR9i8yMe7yG9TFKqUQCPE+/GrhNU1Qf6nFmV+vMnlP9DantkwAt4fPOMZn3
|
||||
j4da65/1YQV+F5bYzaLenNVKbHf8U8fVYLZWIy4yk2Vpe4R2Z+JX/eHWsChE9hOu
|
||||
iFm02eTW5NJLZlWUxYrSE23VXi8oXSEdON3UEOrwSdAUh4SXxLZ9U7KpNVdTwWyR
|
||||
AS4GyzJ/AgMBAAECggEBAKzmcXefLLeNBu4mz30z7Go7es5DRcLoOudiqmFKRs1c
|
||||
4q/xFLj3drdx/WnZZ6ctvDPKRBYFOJF4NRz7Ekfew/c9i6oLnA8KFuceCs53T37j
|
||||
ltCclwT7t1L2ZbxovIsteuJdlDVOV+w2CVqez1Xfh27heKAT6ZEvBtfdkVBPr0uj
|
||||
oVwa2+XlJmYZw5dHeB7ySVeAQ+69zDuADB8OWxPWsv6Del+Fhf0kTHAw4WgqcYsd
|
||||
JUunCjgLdJUlDgXzH/M/Nj8NYVEuq6QpmhaktJ4fwn/F7u3lQllVCFKj5lr0Xb92
|
||||
y7lvQlGqMKX1oxf+P5c5/vie1kDx1Rj4S++flIcVlUECgYEA4BuxCZ1c8oOF98bs
|
||||
KTAONnnZniQ1BRt7rA+O9+++lDjxJhxkuthwjB9YzrnZtxHJtvIIie9Jv8MVfzHa
|
||||
p2woDtiEh3YYwmIlgNUFvTcGe++tTiEiLDcGc/xNhpvfbLaw9QB7/HQ+LT1QCMxJ
|
||||
ufdBrR98l0khIGjYqxDW3W5pV70CgYEA3Ff/9+GM2XI/EUSTYrpnwp5R5OsXz1DL
|
||||
3CFFgp1EPCNk/c3YNWnrUtTkfmKAlRqWIHfphvH/jS6jpGrfRxDggPwGMtBc134b
|
||||
brIM5i4KNj/EcE+w5g03HaKBf1ZihHDQ53c6wTn6IFOHJNSPRLqMNqRymfbclNyO
|
||||
lBMHQmB8yOsCgYBCdZPTwRnuRTi2WQRx1nFwkEQL1Lrwb80GInsIZc2DkTtaTPNG
|
||||
QadmtmkUrSK2Wo0SNsZ3eUHKn2TBmpw4KCfc9zKeJVSEWKy8fu+7xBSlLlebotHK
|
||||
gOrl/H1VHOZuC+OAVItwO1yw98zDPynh/0Q3ve2pw6MSRGV0nYLKmdKdlQKBgQCJ
|
||||
Ty1rw1qKhu9WS22tMIxIc3CFPxtvTeI8I1+1rVtAPq5Im2YIoyDKVXCucaO/RvoW
|
||||
8aLNPTELQe0oIJFTL+k3d9ZFBCNXBncB3GK9biNe+w3nD0IlmkamaQZZ2/M4pTUJ
|
||||
iPtMPlzomCS3ht5g7f9CbegcmgGLooYXMGRtsMMSUQKBgQCoj+3UciH2i+HyUla5
|
||||
1FxivjH3MqSTE4Q7OdzrELb6DoLYzjgWAbpG8HIuodD4uG5xz1oR5H7vkblf1itB
|
||||
hwOwDEiabyX76e/I3Q0ovwBV+9PMjM4UVU0kHoiu3Z2s90ckwNh58w3QH5fn9E0b
|
||||
fqMnB6uWze+xrXWijaOzVZhIZg==
|
||||
-----END RSA PRIVATE KEY-----"""
|
||||
private_key = RSA.import_key(private_key_pem)
|
||||
cipher = PKCS1_v1_5.new(private_key)
|
||||
decrypted_data = cipher.decrypt(b64decode(encrypted_data), None)
|
||||
return decrypted_data.decode('utf-8')
|
||||
|
||||
def ersa(self, data):
|
||||
public_key = """-----BEGIN PUBLIC KEY-----
|
||||
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA+0QMb3WDXjNBRovRhTLH
|
||||
g3d+CliZAva2tepWNNN0Pj6DgE3ZTnPR34iL/cjo9Jbd3dqAJs/YkKnFurGkDxz5
|
||||
TthIqvmz244wiFcHt+FGWoJsj5ZVvrH3pPwH85ggmI1DjxSJEUhB12Z9X6FGli8D
|
||||
drR9xeLe5y8vFekux8xCQ7pwH1mNQu4Wy32WVM8aLjmRjNzEWOvEMAWCRuwymEdS
|
||||
zlWoH53qk1dqd6DAmOJhWU2hH6Yt2ZY9LTaDGiHrS+g0DuwajAQzhbM8eonGYMph
|
||||
nP4q0UTHWEfaGR3HoILmeM32M+qF/UCGfgfR6tCMiXPoHwnD2zoxbZ2p+QlYuTZL
|
||||
vQIDAQAB
|
||||
-----END PUBLIC KEY-----"""
|
||||
key = RSA.importKey(public_key)
|
||||
cipher = PKCS1_v1_5.new(key)
|
||||
encrypted = cipher.encrypt(data.encode())
|
||||
return b64encode(encrypted).decode()
|
||||
|
||||
def eaes(self, data, key):
|
||||
key = key.encode('utf-8')
|
||||
cipher = AES.new(key, AES.MODE_ECB)
|
||||
padded = pad(data.encode('utf-8'), AES.block_size)
|
||||
encrypted = cipher.encrypt(padded)
|
||||
word = b64encode(encrypted).decode('utf-8')
|
||||
return word
|
||||
|
||||
def daes(self, encrypted_data, key):
|
||||
key = key.encode('utf-8')
|
||||
cipher = AES.new(key, AES.MODE_ECB)
|
||||
encrypted = b64decode(encrypted_data)
|
||||
decrypted = cipher.decrypt(encrypted)
|
||||
unpadded = unpad(decrypted, AES.block_size)
|
||||
return unpadded.decode('utf-8')
|
||||
|
||||
def getbody(self,params=None):
|
||||
body = {
|
||||
"requestId": self.uuid(),
|
||||
"timeStamp": str(int(time.time()*1000)),
|
||||
"version": "1.0.3",
|
||||
"package": "com.hjmore.wallpaper",
|
||||
"userLoginToken": "",
|
||||
"app_id": "534",
|
||||
"appName": 2131951658,
|
||||
"device_id": "3507f394e83d2424",
|
||||
"device-id": "3507f394e83d2424",
|
||||
"oaid": "",
|
||||
"imei": "",
|
||||
"referer_shop": "边缘影视",
|
||||
"referer-shop": "边缘影视",
|
||||
"access_fine_location": 0,
|
||||
"access-fine-location": 0
|
||||
}
|
||||
if params:
|
||||
body.update(params)
|
||||
return body
|
||||
|
||||
def getdata(self, path, body,host=None):
|
||||
jdata=json.dumps(body)
|
||||
msign = self.md5(jdata)
|
||||
skey = self.getskey()
|
||||
jsign={'key': skey,'sign': msign}
|
||||
Sign=self.ersa(json.dumps(jsign))
|
||||
header=self.headers.copy()
|
||||
header['Sign']=Sign
|
||||
dbody=self.eaes(jdata, skey)
|
||||
response = self.post(f'{host or self.host}{path}', headers=header, data=dbody)
|
||||
rdata=response.text
|
||||
if response.headers.get('Sign'):
|
||||
dkey=self.drsa(response.headers['Sign'])
|
||||
rdata=self.daes(rdata, dkey)
|
||||
return json.loads(rdata)
|
||||
|
||||
def e64(self, text):
|
||||
try:
|
||||
text_bytes = text.encode('utf-8')
|
||||
encoded_bytes = b64encode(text_bytes)
|
||||
return encoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64编码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def d64(self,encoded_text):
|
||||
try:
|
||||
encoded_bytes = encoded_text.encode('utf-8')
|
||||
decoded_bytes = b64decode(encoded_bytes)
|
||||
return decoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64解码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def md5(self,text):
|
||||
h = MD5.new()
|
||||
h.update(text.encode('utf-8'))
|
||||
return h.hexdigest()
|
||||
|
||||
def uuid(self):
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
|
||||
|
210
PY/分享金牌.py
Normal file
210
PY/分享金牌.py
Normal file
@ -0,0 +1,210 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# by @嗷呜
|
||||
import json
|
||||
import sys
|
||||
import threading
|
||||
import uuid
|
||||
import requests
|
||||
sys.path.append('..')
|
||||
from base.spider import Spider
|
||||
import time
|
||||
from Crypto.Hash import MD5, SHA1
|
||||
|
||||
class Spider(Spider):
|
||||
|
||||
def init(self, extend=""):
|
||||
if extend:
|
||||
hosts=json.loads(extend)['site']
|
||||
self.host = self.host_late(hosts)
|
||||
pass
|
||||
|
||||
def getName(self):
|
||||
pass
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
pass
|
||||
|
||||
def manualVideoCheck(self):
|
||||
pass
|
||||
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
def homeContent(self, filter):
|
||||
cdata = self.fetch(f"{self.host}/api/mw-movie/anonymous/get/filer/type", headers=self.getheaders()).json()
|
||||
fdata = self.fetch(f"{self.host}/api/mw-movie/anonymous/v1/get/filer/list", headers=self.getheaders()).json()
|
||||
result = {}
|
||||
classes = []
|
||||
filters={}
|
||||
for k in cdata['data']:
|
||||
classes.append({
|
||||
'type_name': k['typeName'],
|
||||
'type_id': str(k['typeId']),
|
||||
})
|
||||
sort_values = [{"n": "最近更新", "v": "2"},{"n": "人气高低", "v": "3"}, {"n": "评分高低", "v": "4"}]
|
||||
for tid, d in fdata['data'].items():
|
||||
current_sort_values = sort_values.copy()
|
||||
if tid == '1':
|
||||
del current_sort_values[0]
|
||||
filters[tid] = [
|
||||
{"key": "type", "name": "类型",
|
||||
"value": [{"n": i["itemText"], "v": i["itemValue"]} for i in d["typeList"]]},
|
||||
|
||||
*([] if not d["plotList"] else [{"key": "v_class", "name": "剧情",
|
||||
"value": [{"n": i["itemText"], "v": i["itemText"]}
|
||||
for i in d["plotList"]]}]),
|
||||
|
||||
{"key": "area", "name": "地区",
|
||||
"value": [{"n": i["itemText"], "v": i["itemText"]} for i in d["districtList"]]},
|
||||
|
||||
{"key": "year", "name": "年份",
|
||||
"value": [{"n": i["itemText"], "v": i["itemText"]} for i in d["yearList"]]},
|
||||
|
||||
{"key": "lang", "name": "语言",
|
||||
"value": [{"n": i["itemText"], "v": i["itemText"]} for i in d["languageList"]]},
|
||||
|
||||
{"key": "sort", "name": "排序", "value": current_sort_values}
|
||||
]
|
||||
result['class'] = classes
|
||||
result['filters'] = filters
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
data1 = self.fetch(f"{self.host}/api/mw-movie/anonymous/v1/home/all/list", headers=self.getheaders()).json()
|
||||
data2=self.fetch(f"{self.host}/api/mw-movie/anonymous/home/hotSearch",headers=self.getheaders()).json()
|
||||
data=[]
|
||||
for i in data1['data'].values():
|
||||
data.extend(i['list'])
|
||||
data.extend(data2['data'])
|
||||
vods=self.getvod(data)
|
||||
return {'list':vods}
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
|
||||
params = {
|
||||
"area": extend.get('area', ''),
|
||||
"filterStatus": "1",
|
||||
"lang": extend.get('lang', ''),
|
||||
"pageNum": pg,
|
||||
"pageSize": "30",
|
||||
"sort": extend.get('sort', '1'),
|
||||
"sortBy": "1",
|
||||
"type": extend.get('type', ''),
|
||||
"type1": tid,
|
||||
"v_class": extend.get('v_class', ''),
|
||||
"year": extend.get('year', '')
|
||||
}
|
||||
data = self.fetch(f"{self.host}/api/mw-movie/anonymous/video/list?{self.js(params)}", headers=self.getheaders(params)).json()
|
||||
result = {}
|
||||
result['list'] = self.getvod(data['data']['list'])
|
||||
result['page'] = pg
|
||||
result['pagecount'] = 9999
|
||||
result['limit'] = 90
|
||||
result['total'] = 999999
|
||||
return result
|
||||
|
||||
def detailContent(self, ids):
|
||||
data=self.fetch(f"{self.host}/api/mw-movie/anonymous/video/detail?id={ids[0]}",headers=self.getheaders({'id':ids[0]})).json()
|
||||
vod=self.getvod([data['data']])[0]
|
||||
vod['vod_play_from']='分享者在线'
|
||||
vod['vod_play_url'] = '#'.join(
|
||||
f"{i['name'] if len(vod['episodelist']) > 1 else vod['vod_name']}${ids[0]}@@{i['nid']}" for i in
|
||||
vod['episodelist'])
|
||||
vod.pop('episodelist', None)
|
||||
return {'list':[vod]}
|
||||
|
||||
def searchContent(self, key, quick, pg="1"):
|
||||
params = {
|
||||
"keyword": key,
|
||||
"pageNum": pg,
|
||||
"pageSize": "8",
|
||||
"sourceCode": "1"
|
||||
}
|
||||
data=self.fetch(f"{self.host}/api/mw-movie/anonymous/video/searchByWord?{self.js(params)}",headers=self.getheaders(params)).json()
|
||||
vods=self.getvod(data['data']['result']['list'])
|
||||
return {'list':vods,'page':pg}
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
self.header = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.6478.61 Chrome/126.0.6478.61 Not/A)Brand/8 Safari/537.36',
|
||||
'sec-ch-ua-platform': '"Windows"',
|
||||
'DNT': '1',
|
||||
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
|
||||
'sec-ch-ua-mobile': '?0',
|
||||
'Origin': self.host,
|
||||
'Referer': f'{self.host}/'
|
||||
}
|
||||
ids=id.split('@@')
|
||||
pdata = self.fetch(f"{self.host}/api/mw-movie/anonymous/v2/video/episode/url?clientType=1&id={ids[0]}&nid={ids[1]}",headers=self.getheaders({'clientType':'1','id': ids[0], 'nid': ids[1]})).json()
|
||||
vlist=[]
|
||||
for i in pdata['data']['list']:vlist.extend([i['resolutionName'],i['url']])
|
||||
return {'parse':0,'url':vlist,'header':self.header}
|
||||
|
||||
def localProxy(self, param):
|
||||
pass
|
||||
|
||||
def host_late(self, url_list):
|
||||
if isinstance(url_list, str):
|
||||
urls = [u.strip() for u in url_list.split(',')]
|
||||
else:
|
||||
urls = url_list
|
||||
if len(urls) <= 1:
|
||||
return urls[0] if urls else ''
|
||||
|
||||
results = {}
|
||||
threads = []
|
||||
|
||||
def test_host(url):
|
||||
try:
|
||||
start_time = time.time()
|
||||
response = requests.head(url, timeout=1.0, allow_redirects=False)
|
||||
delay = (time.time() - start_time) * 1000
|
||||
results[url] = delay
|
||||
except Exception as e:
|
||||
results[url] = float('inf')
|
||||
for url in urls:
|
||||
t = threading.Thread(target=test_host, args=(url,))
|
||||
threads.append(t)
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
return min(results.items(), key=lambda x: x[1])[0]
|
||||
|
||||
def md5(self, sign_key):
|
||||
md5_hash = MD5.new()
|
||||
md5_hash.update(sign_key.encode('utf-8'))
|
||||
md5_result = md5_hash.hexdigest()
|
||||
return md5_result
|
||||
|
||||
def js(self, param):
|
||||
return '&'.join(f"{k}={v}" for k, v in param.items())
|
||||
|
||||
def getheaders(self, param=None):
|
||||
if param is None:param = {}
|
||||
t=str(int(time.time()*1000))
|
||||
param['key']='cb808529bae6b6be45ecfab29a4889bc'
|
||||
param['t']=t
|
||||
sha1_hash = SHA1.new()
|
||||
sha1_hash.update(self.md5(self.js(param)).encode('utf-8'))
|
||||
sign = sha1_hash.hexdigest()
|
||||
deviceid = str(uuid.uuid4())
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.6478.61 Chrome/126.0.6478.61 Not/A)Brand/8 Safari/537.36',
|
||||
'Accept': 'application/json, text/plain, */*',
|
||||
'sign': sign,
|
||||
't': t,
|
||||
'deviceid':deviceid
|
||||
}
|
||||
return headers
|
||||
|
||||
def convert_field_name(self, field):
|
||||
field = field.lower()
|
||||
if field.startswith('vod') and len(field) > 3:
|
||||
field = field.replace('vod', 'vod_')
|
||||
if field.startswith('type') and len(field) > 4:
|
||||
field = field.replace('type', 'type_')
|
||||
return field
|
||||
|
||||
def getvod(self, array):
|
||||
return [{self.convert_field_name(k): v for k, v in item.items()} for item in array]
|
||||
|
209
PY/分享魔方.py
Normal file
209
PY/分享魔方.py
Normal file
@ -0,0 +1,209 @@
|
||||
import re
|
||||
import sys
|
||||
from Crypto.Hash import MD5
|
||||
sys.path.append("..")
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from urllib.parse import quote, urlparse
|
||||
from base64 import b64encode, b64decode
|
||||
import json
|
||||
import time
|
||||
from base.spider import Spider
|
||||
|
||||
|
||||
class Spider(Spider):
|
||||
|
||||
def init(self, extend=""):
|
||||
self.host = self.gethost()
|
||||
pass
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
pass
|
||||
|
||||
def manualVideoCheck(self):
|
||||
pass
|
||||
|
||||
def action(self, action):
|
||||
pass
|
||||
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
def homeContent(self, filter):
|
||||
data = self.getdata("/api.php/getappapi.index/initV119")
|
||||
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
|
||||
"sort": "排序"}
|
||||
filters = {}
|
||||
classes = []
|
||||
json_data = data["type_list"]
|
||||
homedata = data["banner_list"][8:]
|
||||
for item in json_data:
|
||||
if item["type_name"] == "全部":
|
||||
continue
|
||||
has_non_empty_field = False
|
||||
jsontype_extend = json.loads(item["type_extend"])
|
||||
homedata.extend(item["recommend_list"])
|
||||
jsontype_extend["sort"] = "最新,最热,最赞"
|
||||
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
|
||||
for key in dy:
|
||||
if key in jsontype_extend and jsontype_extend[key].strip() != "":
|
||||
has_non_empty_field = True
|
||||
break
|
||||
if has_non_empty_field:
|
||||
filters[str(item["type_id"])] = []
|
||||
for dkey in jsontype_extend:
|
||||
if dkey in dy and jsontype_extend[dkey].strip() != "":
|
||||
values = jsontype_extend[dkey].split(",")
|
||||
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
|
||||
value.strip() != ""]
|
||||
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
|
||||
result = {}
|
||||
result["class"] = classes
|
||||
result["filters"] = filters
|
||||
result["list"] = homedata[1:]
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
pass
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
|
||||
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
|
||||
"class": extend.get('class', '全部')}
|
||||
result = {}
|
||||
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
|
||||
result["list"] = data["recommend_list"]
|
||||
result["page"] = pg
|
||||
result["pagecount"] = 9999
|
||||
result["limit"] = 90
|
||||
result["total"] = 999999
|
||||
return result
|
||||
|
||||
def detailContent(self, ids):
|
||||
body = f"vod_id={ids[0]}"
|
||||
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
|
||||
vod = data["vod"]
|
||||
play = []
|
||||
names = []
|
||||
for itt in data["vod_play_list"]:
|
||||
a = []
|
||||
names.append(itt["player_info"]["show"])
|
||||
for it in itt['urls']:
|
||||
it['user_agent'] = itt["player_info"].get("user_agent")
|
||||
it["parse"] = itt["player_info"].get("parse")
|
||||
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
|
||||
play.append("#".join(a))
|
||||
vod["vod_play_from"] = "$$$".join(names)
|
||||
vod["vod_play_url"] = "$$$".join(play)
|
||||
result = {"list": [vod]}
|
||||
return result
|
||||
|
||||
def searchContent(self, key, quick, pg="1"):
|
||||
body = f"keywords={key}&type_id=0&page={pg}"
|
||||
data = self.getdata("/api.php/getappapi.index/searchList", body)
|
||||
result = {"list": data["search_list"], "page": pg}
|
||||
return result
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
ids = json.loads(self.d64(id))
|
||||
h = {"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
|
||||
try:
|
||||
if re.search(r'url=', ids['parse_api_url']):
|
||||
data = self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
|
||||
url = data.get('url') or data['data'].get('url')
|
||||
else:
|
||||
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'], True))}&token={ids.get('token')}"
|
||||
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
|
||||
url = json.loads(b)['url']
|
||||
if 'error' in url: raise ValueError(f"解析失败: {url}")
|
||||
p = 0
|
||||
except Exception as e:
|
||||
print('错误信息:', e)
|
||||
url, p = ids['url'], 1
|
||||
|
||||
if re.search(r'\.jpg|\.png|\.jpeg', url):
|
||||
url = self.Mproxy(url)
|
||||
result = {}
|
||||
result["parse"] = p
|
||||
result["url"] = url
|
||||
result["header"] = h
|
||||
return result
|
||||
|
||||
def localProxy(self, param):
|
||||
return self.Mlocal(param)
|
||||
|
||||
def gethost(self):
|
||||
headers = {
|
||||
'User-Agent': 'okhttp/3.14.9'
|
||||
}
|
||||
response = self.fetch('https://snysw.xyz/mfys.txt',headers=headers).text
|
||||
return response.strip()
|
||||
|
||||
def aes(self, text, b=None):
|
||||
key = b"1234567887654321"
|
||||
cipher = AES.new(key, AES.MODE_CBC, key)
|
||||
if b:
|
||||
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
|
||||
ct = b64encode(ct_bytes).decode("utf-8")
|
||||
return ct
|
||||
else:
|
||||
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
|
||||
return pt.decode("utf-8")
|
||||
|
||||
def header(self):
|
||||
t = str(int(time.time()))
|
||||
header = {"Referer": self.host,
|
||||
"User-Agent": "okhttp/3.14.9", "app-version-code": "140", "app-ui-mode": "light",
|
||||
"app-api-verify-time": t, "app-user-device-id": self.md5(t),
|
||||
"app-api-verify-sign": self.aes(t, True),
|
||||
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
|
||||
return header
|
||||
|
||||
def getdata(self, path, data=None):
|
||||
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
|
||||
data1 = self.aes(vdata)
|
||||
return json.loads(data1)
|
||||
|
||||
def Mproxy(self, url):
|
||||
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
|
||||
|
||||
def Mlocal(self, param, header=None):
|
||||
url = self.d64(param["url"])
|
||||
ydata = self.fetch(url, headers=header, allow_redirects=False)
|
||||
data = ydata.content.decode('utf-8')
|
||||
if ydata.headers.get('Location'):
|
||||
url = ydata.headers['Location']
|
||||
data = self.fetch(url, headers=header).content.decode('utf-8')
|
||||
parsed_url = urlparse(url)
|
||||
durl = parsed_url.scheme + "://" + parsed_url.netloc
|
||||
lines = data.strip().split('\n')
|
||||
for index, string in enumerate(lines):
|
||||
if '#EXT' not in string and 'http' not in string:
|
||||
last_slash_index = string.rfind('/')
|
||||
lpath = string[:last_slash_index + 1]
|
||||
lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
|
||||
data = '\n'.join(lines)
|
||||
return [200, "application/vnd.apple.mpegur", data]
|
||||
|
||||
def e64(self, text):
|
||||
try:
|
||||
text_bytes = text.encode('utf-8')
|
||||
encoded_bytes = b64encode(text_bytes)
|
||||
return encoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64编码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def d64(self, encoded_text):
|
||||
try:
|
||||
encoded_bytes = encoded_text.encode('utf-8')
|
||||
decoded_bytes = b64decode(encoded_bytes)
|
||||
return decoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64解码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def md5(self, text):
|
||||
h = MD5.new()
|
||||
h.update(text.encode('utf-8'))
|
||||
return h.hexdigest()
|
212
PY/分享鲢鱼.py
Normal file
212
PY/分享鲢鱼.py
Normal file
@ -0,0 +1,212 @@
|
||||
import re
|
||||
import sys
|
||||
from Crypto.Hash import MD5
|
||||
sys.path.append("..")
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from urllib.parse import quote, urlparse
|
||||
from base64 import b64encode, b64decode
|
||||
import json
|
||||
import time
|
||||
from base.spider import Spider
|
||||
|
||||
|
||||
class Spider(Spider):
|
||||
|
||||
def init(self, extend=""):
|
||||
self.host = 'http://47.122.22.78'
|
||||
self.did=self.getdid()
|
||||
pass
|
||||
|
||||
def isVideoFormat(self, url):
|
||||
pass
|
||||
|
||||
def manualVideoCheck(self):
|
||||
pass
|
||||
|
||||
def action(self, action):
|
||||
pass
|
||||
|
||||
def destroy(self):
|
||||
pass
|
||||
|
||||
def homeContent(self, filter):
|
||||
data = self.getdata("/api.php/getappapi.index/initV119")
|
||||
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
|
||||
"sort": "排序"}
|
||||
filters = {}
|
||||
classes = []
|
||||
json_data = data["type_list"]
|
||||
homedata = data["banner_list"][8:]
|
||||
for item in json_data:
|
||||
if item["type_name"] == "全部":
|
||||
continue
|
||||
has_non_empty_field = False
|
||||
jsontype_extend = json.loads(item["type_extend"])
|
||||
homedata.extend(item["recommend_list"])
|
||||
jsontype_extend["sort"] = "最新,最热,最赞"
|
||||
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
|
||||
for key in dy:
|
||||
if key in jsontype_extend and jsontype_extend[key].strip() != "":
|
||||
has_non_empty_field = True
|
||||
break
|
||||
if has_non_empty_field:
|
||||
filters[str(item["type_id"])] = []
|
||||
for dkey in jsontype_extend:
|
||||
if dkey in dy and jsontype_extend[dkey].strip() != "":
|
||||
values = jsontype_extend[dkey].split(",")
|
||||
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
|
||||
value.strip() != ""]
|
||||
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
|
||||
result = {}
|
||||
result["class"] = classes
|
||||
result["filters"] = filters
|
||||
result["list"] = homedata[1:]
|
||||
return result
|
||||
|
||||
def homeVideoContent(self):
|
||||
pass
|
||||
|
||||
def categoryContent(self, tid, pg, filter, extend):
|
||||
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
|
||||
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
|
||||
"class": extend.get('class', '全部')}
|
||||
result = {}
|
||||
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
|
||||
result["list"] = data["recommend_list"]
|
||||
result["page"] = pg
|
||||
result["pagecount"] = 9999
|
||||
result["limit"] = 90
|
||||
result["total"] = 999999
|
||||
return result
|
||||
|
||||
def detailContent(self, ids):
|
||||
body = f"vod_id={ids[0]}"
|
||||
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
|
||||
vod = data["vod"]
|
||||
play = []
|
||||
names = []
|
||||
for itt in data["vod_play_list"]:
|
||||
a = []
|
||||
names.append(itt["player_info"]["show"])
|
||||
for it in itt['urls']:
|
||||
it['user_agent'] = itt["player_info"].get("user_agent")
|
||||
it["parse"] = itt["player_info"].get("parse")
|
||||
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
|
||||
play.append("#".join(a))
|
||||
vod["vod_play_from"] = "$$$".join(names)
|
||||
vod["vod_play_url"] = "$$$".join(play)
|
||||
result = {"list": [vod]}
|
||||
return result
|
||||
|
||||
def searchContent(self, key, quick, pg="1"):
|
||||
body = f"keywords={key}&type_id=0&page={pg}"
|
||||
data = self.getdata("/api.php/getappapi.index/searchList", body)
|
||||
result = {"list": data["search_list"], "page": pg}
|
||||
return result
|
||||
|
||||
def playerContent(self, flag, id, vipFlags):
|
||||
ids = json.loads(self.d64(id))
|
||||
h = {"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
|
||||
try:
|
||||
if re.search(r'url=', ids['parse_api_url']):
|
||||
data = self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
|
||||
url = data.get('url') or data['data'].get('url')
|
||||
else:
|
||||
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'], True))}&token={ids.get('token')}"
|
||||
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
|
||||
url = json.loads(b)['url']
|
||||
if 'error' in url: raise ValueError(f"解析失败: {url}")
|
||||
p = 0
|
||||
except Exception as e:
|
||||
print('错误信息:', e)
|
||||
url, p = ids['url'], 1
|
||||
|
||||
if re.search(r'\.jpg|\.png|\.jpeg', url):
|
||||
url = self.Mproxy(url)
|
||||
result = {}
|
||||
result["parse"] = p
|
||||
result["url"] = url
|
||||
result["header"] = h
|
||||
return result
|
||||
|
||||
def localProxy(self, param):
|
||||
return self.Mlocal(param)
|
||||
|
||||
def aes(self, text, b=None):
|
||||
key = b"1234567890123456"
|
||||
cipher = AES.new(key, AES.MODE_CBC, key)
|
||||
if b:
|
||||
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
|
||||
ct = b64encode(ct_bytes).decode("utf-8")
|
||||
return ct
|
||||
else:
|
||||
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
|
||||
return pt.decode("utf-8")
|
||||
|
||||
def header(self):
|
||||
t = str(int(time.time()))
|
||||
header = {"Referer": self.host,
|
||||
"User-Agent": "okhttp/3.14.9", "app-version-code": "101", "app-ui-mode": "light",
|
||||
"app-api-verify-time": t, "app-user-device-id": self.did,
|
||||
"app-api-verify-sign": self.aes(t, True),
|
||||
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
|
||||
return header
|
||||
|
||||
def getdid(self):
|
||||
did=self.getCache('did')
|
||||
if not did:
|
||||
t = str(int(time.time()))
|
||||
did = self.md5(t)
|
||||
self.setCache('did', did)
|
||||
return did
|
||||
|
||||
def getdata(self, path, data=None):
|
||||
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
|
||||
data1 = self.aes(vdata)
|
||||
return json.loads(data1)
|
||||
|
||||
def Mproxy(self, url):
|
||||
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
|
||||
|
||||
def Mlocal(self, param, header=None):
|
||||
url = self.d64(param["url"])
|
||||
ydata = self.fetch(url, headers=header, allow_redirects=False)
|
||||
data = ydata.content.decode('utf-8')
|
||||
if ydata.headers.get('Location'):
|
||||
url = ydata.headers['Location']
|
||||
data = self.fetch(url, headers=header).content.decode('utf-8')
|
||||
parsed_url = urlparse(url)
|
||||
durl = parsed_url.scheme + "://" + parsed_url.netloc
|
||||
lines = data.strip().split('\n')
|
||||
for index, string in enumerate(lines):
|
||||
if '#EXT' not in string and 'http' not in string:
|
||||
last_slash_index = string.rfind('/')
|
||||
lpath = string[:last_slash_index + 1]
|
||||
lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
|
||||
data = '\n'.join(lines)
|
||||
return [200, "application/vnd.apple.mpegur", data]
|
||||
|
||||
def e64(self, text):
|
||||
try:
|
||||
text_bytes = text.encode('utf-8')
|
||||
encoded_bytes = b64encode(text_bytes)
|
||||
return encoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64编码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def d64(self, encoded_text):
|
||||
try:
|
||||
encoded_bytes = encoded_text.encode('utf-8')
|
||||
decoded_bytes = b64decode(encoded_bytes)
|
||||
return decoded_bytes.decode('utf-8')
|
||||
except Exception as e:
|
||||
print(f"Base64解码错误: {str(e)}")
|
||||
return ""
|
||||
|
||||
def md5(self, text):
|
||||
h = MD5.new()
|
||||
h.update(text.encode('utf-8'))
|
||||
return h.hexdigest()
|
||||
|
Loading…
x
Reference in New Issue
Block a user