Add files via upload

This commit is contained in:
maoystv 2025-04-11 23:03:14 +08:00 committed by GitHub
parent b615c61cfd
commit 2e6bbe2a91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 390 additions and 0 deletions

216
PY/国外剧APP.py Normal file
View File

@ -0,0 +1,216 @@
import re
import sys
from Crypto.Hash import MD5
sys.path.append("..")
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from urllib.parse import quote, urlparse
from base64 import b64encode, b64decode
import json
import time
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host = 'https://guowaiju.com'
self.did=self.getdid()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
def homeContent(self, filter):
data = self.getdata("/api.php/getappapi.index/initV119")
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
"sort": "排序"}
filters = {}
classes = []
json_data = data["type_list"]
homedata = data["banner_list"][8:]
for item in json_data:
if item["type_name"] == "全部":
continue
has_non_empty_field = False
jsontype_extend = json.loads(item["type_extend"])
homedata.extend(item["recommend_list"])
jsontype_extend["sort"] = "最新,最热,最赞"
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
for key in dy:
if key in jsontype_extend and jsontype_extend[key].strip() != "":
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in jsontype_extend:
if dkey in dy and jsontype_extend[dkey].strip() != "":
values = jsontype_extend[dkey].split(",")
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
value.strip() != ""]
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
result = {}
result["class"] = classes
result["filters"] = filters
result["list"] = homedata[1:]
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
"class": extend.get('class', '全部')}
result = {}
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
result["list"] = data["recommend_list"]
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
body = f"vod_id={ids[0]}"
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
vod = data["vod"]
play = []
names = []
for itt in reversed(data["vod_play_list"]):
a = []
names.append(itt["player_info"]["show"])
for it in itt['urls']:
it['user_agent'] = itt["player_info"].get("user_agent")
it["parse"] = itt["player_info"].get("parse")
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
play.append("#".join(a))
vod["vod_play_from"] = "$$$".join(names)
vod["vod_play_url"] = "$$$".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
body = f"keywords={key}&type_id=0&page={pg}"
data = self.getdata("/api.php/getappapi.index/searchList", body)
result = {"list": data["search_list"], "page": pg}
return result
def playerContent(self, flag, id, vipFlags):
ids = json.loads(self.d64(id))
h = {"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
try:
if re.search(r'url=', ids['parse_api_url']):
data = self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
url = data.get('url') or data['data'].get('url')
else:
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'], True))}&token={ids.get('token')}"
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
url = json.loads(b)['url']
if 'error' in url: raise ValueError(f"解析失败: {url}")
p = 0
except Exception as e:
print('错误信息:', e)
url, p = ids['url'], 1
if re.search(r'\.jpg|\.png|\.jpeg', url):
url = self.Mproxy(url)
result = {}
result["parse"] = p
result["url"] = url
result["header"] = h
return result
def localProxy(self, param):
headers = {"User-Agent": "okhttp/3.14.9"}
url = self.d64(param['url'])
ydata = self.fetch(url, headers=headers, allow_redirects=False)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
data = self.fetch(url, headers=headers).content.decode('utf-8')
lines = data.strip().split('\n')
last_r = url[:url.rfind('/')]
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
for index, string in enumerate(lines):
if '#EXT' not in string:
if 'http' not in string:
domain = last_r if string.count('/') < 2 else durl
string = domain + ('' if string.startswith('/') else '/') + string
if string.split('.')[-1].split('?')[0] == 'm3u8':
string = self.Mproxy(string)
lines[index] = string
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def getdid(self):
did=self.getCache('did')
if not did:
t = str(int(time.time()))
did = self.md5(t)
self.setCache('did', did)
return did
def aes(self, text, b=None):
key = b"7xv16h7qgkrs9b1p"
cipher = AES.new(key, AES.MODE_CBC, key)
if b:
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
ct = b64encode(ct_bytes).decode("utf-8")
return ct
else:
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
return pt.decode("utf-8")
def header(self):
t = str(int(time.time()))
header = {
"User-Agent": "okhttp/3.14.9", "app-version-code": "110", "app-ui-mode": "light",
"app-api-verify-time": t, "app-user-device-id": self.did,
"app-api-verify-sign": self.aes(t, True),
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
}
return header
def getdata(self, path, data=None):
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
data1 = self.aes(vdata)
return json.loads(data1)
def Mproxy(self, url):
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()

174
PY/小红影视.py Normal file
View File

@ -0,0 +1,174 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import re
import sys
from base64 import b64decode
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad
sys.path.append("..")
import json
import time
from pyquery import PyQuery as pq
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
host='https://www.xiaohys.com'
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'sec-ch-ua-platform': '"macOS"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'Origin': host,
'Referer': f"{host}/",
}
def homeContent(self, filter):
data=self.getpq(self.fetch(self.host,headers=self.headers).text)
result = {}
classes = []
for k in data('.head-more.box a').items():
i=k.attr('href')
if i and '/show' in i:
classes.append({
'type_name': k.text(),
'type_id': i.split('/')[-1]
})
result['class'] = classes
result['list']=self.getlist(data('.border-box.diy-center .public-list-div'))
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
body = {'type':tid,'class':'','area':'','lang':'','version':'','state':'','letter':'','page':pg}
data = self.post(f"{self.host}/index.php/api/vod", headers=self.headers, data=self.getbody(body)).json()
result = {}
result['list'] = data['list']
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data = self.getpq(self.fetch(f"{self.host}/detail/{ids[0]}/", headers=self.headers).text)
v=data('.detail-info.lightSpeedIn .slide-info')
vod = {
'vod_year': v.eq(-1).text(),
'vod_remarks': v.eq(0).text(),
'vod_actor': v.eq(3).text(),
'vod_director': v.eq(2).text(),
'vod_content': data('.switch-box #height_limit').text()
}
np=data('.anthology.wow.fadeInUp')
ndata=np('.anthology-tab .swiper-wrapper .swiper-slide')
pdata=np('.anthology-list .anthology-list-box ul')
play,names=[],[]
for i in range(len(ndata)):
n=ndata.eq(i)('a')
n('span').remove()
names.append(n.text())
vs=[]
for v in pdata.eq(i)('li').items():
vs.append(f"{v.text()}${v('a').attr('href')}")
play.append('#'.join(vs))
vod["vod_play_from"] = "$$$".join(names)
vod["vod_play_url"] = "$$$".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
data = self.fetch(f"{self.host}/index.php/ajax/suggest?mid=1&wd={key}&limit=9999&timestamp={int(time.time()*1000)}", headers=self.headers).json()
videos=[]
for i in data['list']:
videos.append({
'vod_id': i['id'],
'vod_name': i['name'],
'vod_pic': i['pic']
})
return {'list':videos,'page':pg}
def playerContent(self, flag, id, vipFlags):
h,p,url1= {"User-Agent": "okhttp/3.14.9"},1,''
url=f"{self.host}{id}"
data = self.getpq(self.fetch(url, headers=self.headers).text)
try:
jstr = data('.player .player-left script').eq(0).text()
jsdata = json.loads(jstr.split('=',1)[-1])
body, url1= {'url': jsdata['url'],'referer':url},jsdata['url']
data = self.post(f"{self.host}/static/player/artplayer/api.php?ac=getdate", headers=self.headers, data=body).json()
l=self.aes(data['data'],data['iv'])
url=l.get('url') or l['data'].get('url')
p = 0
if not url:raise Exception('未找到播放地址')
except Exception as e:
print('错误信息:',e)
if re.search(r'\.m3u8|\.mp4',url1):url=url1
result = {}
result["parse"] = p
result["url"] = url
result["header"] = h
return result
def localProxy(self, param):
pass
def getbody(self, params):
t=int(time.time())
h = MD5.new()
h.update(f"DS{t}DCC147D11943AF75".encode('utf-8'))
key=h.hexdigest()
params.update({'time':t,'key':key})
return params
def getlist(self,data):
videos=[]
for i in data.items():
id = i('a').attr('href')
if id:
id = re.search(r'\d+', id).group(0)
img = i('img').attr('data-src')
if img and 'url=' in img and 'http' not in img: img = f'{self.host}{img}'
videos.append({
'vod_id': id,
'vod_name': i('img').attr('alt'),
'vod_pic': img,
'vod_remarks': i('.public-prt').text() or i('.public-list-prb').text()
})
return videos
def getpq(self, data):
try:
return pq(data)
except Exception as e:
print(f"{str(e)}")
return pq(data.encode('utf-8'))
def aes(self, text,iv):
key = b"d978a93ffb4d3a00"
iv = iv.encode("utf-8")
cipher = AES.new(key, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
return json.loads(pt.decode("utf-8"))