Add files via upload

This commit is contained in:
maoystv 2025-04-06 10:21:28 +08:00 committed by GitHub
parent a1b09e5c34
commit 36efd48fd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1060 additions and 0 deletions

254
PY/CliCli动漫.py Normal file
View File

@ -0,0 +1,254 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import base64
import json
import sys
import time
from base64 import b64decode, b64encode
from Crypto.Cipher import AES, PKCS1_v1_5
from Crypto.Hash import MD5
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import unpad, pad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.did=self.getdid()
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host='http://60.204.242.79:8091'
def homeContent(self, filter):
res = self.fetch(f'{self.host}/app/channel?top-level=true', headers=self.getheaders()).text
data = self.getdata(res)
result = {}
classes = []
filters = {}
sortsn = ['最新','最热','高分']
for k in data['data']:
classes.append({
'type_name': k['name'],
'type_id': k['id']
})
filters[k['id']] = []
k['sorts']=['addtime','hits','gold']
for key,value in k.items():
if type(value) == list:
filters[k['id']].append({
'name': key,
'key': key,
'value': [{'v': x,'n': x if key !='sorts' else sortsn[i]} for i,x in enumerate(value) if x]
})
result['class'] = classes
result['filters'] = filters
return result
def homeVideoContent(self):
res=self.fetch(f'{self.host}/app/banners/0',headers=self.getheaders()).text
data=self.getdata(res)
videos=[]
for i in data['data']:
videos.append({
'vod_id': i['vid'],
'vod_name': i['vname'],
'vod_pic': i['img'],
'vod_remarks': i['continu']
})
return {'list':videos}
def categoryContent(self, tid, pg, filter, extend):
params={'channel':tid,'type':extend.get('types',''),'area':extend.get('areas',''),'year':extend.get('years',''),'sort':extend.get('sorts','addtime'),'limit':'30','page':pg}
data=self.fetch(f'{self.host}/app/video/list',params=params,headers=self.getheaders()).text
data=self.getdata(data)
videos=[]
for i in data['data']['items']:
videos.append({
'vod_id': i.get('id'),
'vod_name': i.get('name'),
'vod_pic': i.get('pic'),
'vod_year': i.get('year'),
'vod_remarks': i.get('continu')
})
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data=self.fetch(f'{self.host}/app/video/detail?id={ids[0]}',headers=self.getheaders()).text
data=self.getdata(data)
v=data['data']
vod = {
'type_name': v.get('type'),
'vod_year': v.get('year'),
'vod_area': v.get('area'),
'vod_remarks': v.get('continu'),
'vod_actor': v.get('actor'),
'vod_director': v.get('director'),
'vod_content': v.get('content'),
'vod_play_from': '',
'vod_play_url': ''
}
parts,names = [],[]
for i in v['parts']:
names.append(i['play_zh'])
p=[]
for j,x in enumerate(i['part']):
params={'id':ids[0],'play':i['play'],'part':x}
p.append(f'{x}${self.e64(json.dumps(params))}')
parts.append('#'.join(p))
vod['vod_play_from'] = '$$$'.join(names)
vod['vod_play_url'] = '$$$'.join(parts)
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
params={'key':key,'limit':'25','page':pg}
data=self.fetch(f'{self.host}/app/video/search',params=params,headers=self.getheaders()).text
data=self.getdata(data)
videos = []
for i in data['data']['items']:
videos.append({
'vod_id': i.get('id'),
'vod_name': i.get('name'),
'vod_pic': i.get('pic'),
'vod_year': i.get('year'),
'vod_remarks': i.get('continu')
})
return {'list':videos,'page':pg}
def playerContent(self, flag, id, vipFlags):
params= json.loads(self.d64(id))
data=self.fetch(f'{self.host}/app/video/play',params=params,headers=self.getheaders()).text
data=self.getdata(data)
urls=[]
for i in data['data']:
if i.get('url'):urls.extend([i['resolution'],i['url']])
return {'parse': 0, 'url': urls, 'header': {'User-Agent': 'Dart/3.6 (dart:io)'}}
def liveContent(self, url):
pass
def localProxy(self, param):
pass
def getheaders(self):
t=str(int(time.time() * 1000))
stinf=f"3.0.0.2-{t}-Android-1.0.4.5-{self.did}"
authentication=self.aes_encrypt(self.e64(stinf))
headers = {
'User-Agent': 'Dart/3.6 (dart:io)',
'x-version': '2020-09-17',
'appid': '4150439554430614',
'ts': t,
'authentication': authentication,
'content-type': 'application/json; charset=utf-8',
}
return headers
def aes_encrypt(self, text):
key = b'ziISjqkXPsGUMRNGyWigxDGtJbfTdcGv'
iv = b'WonrnVkxeIxDcFbv'
cipher = AES.new(key, AES.MODE_CBC, iv)
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
ct = b64encode(ct_bytes).decode("utf-8")
return ct
def aes_decrypt(self, key,text):
iv=key[::-1].encode("utf-8")
key=key.encode("utf-8")
cipher = AES.new(key, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
return json.loads(pt.decode("utf-8"))
def rsa_decrypt(self, encrypted_data):
try:
private_key_string = '''-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA5xpfniKIMYdjTytUBu5rsLbMtcCRW9B9DB78QEdf4wW5jO8r
Mw7j+/mYk3ghi0xrxpjtHm1R2KgNT1b0akJCExTH7gBVcjVywpmXdNXbcuCGfVCK
S6vYfMypmj5lNBgalCHe5AVc0ghhP3FG5j8Q5B7q00+tk4nT9nFsTmTeNcAKSH9h
aM6a0fbiJ3eXbxEr2o8raAjck10act35t/MIUOkcrQjHx5E9Yvqgs3qbq4yDakaG
4qfMAV4DAkkmdZ8N3fdEQ+rFJ67Spd4zzowj81+YO9wMUP2hNgfXmLOGLS5Lyi+x
vrwwWZXAIRUkhdQEAYQlhGs8wV9P4bJnTzplewIDAQABAoIBAEnRzNUwZpybiIdT
acXFBrUtzvoHhubzE955T04g/mn//CMeiogGq6BjO+9vIhfi01Jequ9bMBeqpoW/
WtdOTtjVfH9zr9eJZxzt/skdPrnVKmCBB4vgWoiSv2I7qAwZ3vOOVioz5FBayOWB
A4qsfnK/xXa2LtW/4usHk/b+lVRJZhHl3eKio2CnVBrgRb2DTx1GAwpvaRXp0oHm
LXDEtngxN4/rh2irPKgaG/lgrCBISKUHtwtgytcpltsHMASMXIKAjZjNgCA98fA3
te96U58wGHzQBQ5XtwTf0PiFEfJ7yOhgNRgCtiwsjGOhJFJFiiXYKzTef1GnVxPa
wuPc0TECgYEA+KCts3ArkWLqWbi4bVDpekP71geEnQIklSAk3RRZ0eiC1pmmkuTh
+q/4jOfoQHGuYCc8GvJqxQ8Y+aspPptbsAeRMSVovjQUvpRMqD0SWT8o3W2xGfqd
0W4p14CIF7oXjMqQVeY468AYzxUdNsaulrp9Wnpa5njzE5D5WGDu0IcCgYEA7fSq
kvz1oXjlljlskBwJ8gDB8j53PhuqV6Ori71G/qIGpYuOVjHSfPD/04a9T3M9olpk
vlLOLn7GS7xa4pjugmp0EDdxBIJJtTHbbi4NL4ZoYg+vHkiemkjGLis4x5qRKjg6
jNUEhnpksm68IUMSyO2toasfR0nVUmkb+ylKhG0CgYEAqNDZAJSyUHZcb21YdIlS
7rzIe2wBZGZ3FnaL8T0HO9rnM/WCQA1/Tys61doFPfSylQEu85EUZBc7OxM33xW3
7M9Gi5s+Ap/0Ue76GeXV1plnEuqPLPeZPwHREU1pmsq1gNhtppW6ooB9l+ZbPr0r
AJdB1DRuEj2ftvJiC9tNbHMCgYEAvHaliply6hrYq6x7gX/TmKpk8bnrs3Mx7Qui
WKDm09H8Na1cZIQ9U9uEo0H6OizpyeaSF/N5fXXHFEDwMrwxW3V4y0c96fZO7oW4
Z4FtzBBGKDSH3BJkG4o7/GEbLWwMQUYbiWNFnETf8DqoIif/fshQVtUzhsDBhe3d
zYUckdkCgYAJlTYhJz0qXcO8a5KsQ20/hEGRtOcq+mfPOdGYBOv6LB2ThuDKunbY
WsmAvqSo1qoJONnhQVMSpzKWEjCYV6hcifV9aeFofD4kNmG1gWC18QIYfrihLyOU
E4GDW7QN8HO2YiQpopGP/muKsIlCmxKP6DasgCCO36xs87Wi8gu1DA==
-----END RSA PRIVATE KEY-----'''
private_key = RSA.import_key(private_key_string)
cipher = PKCS1_v1_5.new(private_key)
encrypted_bytes = base64.b64decode(encrypted_data)
decrypted_bytes = cipher.decrypt(encrypted_bytes, None)
return decrypted_bytes.decode('utf-8')
except:
return ""
def getdata(self, data):
ds=data.split('.')
key=self.rsa_decrypt(ds[0])
result=self.aes_decrypt(key,ds[1])
return result
def getdid(self):
did=self.getCache('did')
if not did:
t = str(int(time.time()))
did = self.md5(t)
self.setCache('did', did)
return did
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()

249
PY/MiFunP动漫.py Normal file
View File

@ -0,0 +1,249 @@
import re
import sys
import threading
import requests
from Crypto.Hash import MD5
sys.path.append("..")
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from urllib.parse import quote, urlparse
from base64 import b64encode, b64decode
import json
import time
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host = self.gethost()
self.did=self.getdid()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
def homeContent(self, filter):
data = self.getdata("/api.php/getappapi.index/initV119")
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
"sort": "排序"}
filters = {}
classes = []
json_data = data["type_list"]
homedata = data["banner_list"][8:]
for item in json_data:
if item["type_name"] == "全部":
continue
has_non_empty_field = False
jsontype_extend = json.loads(item["type_extend"])
homedata.extend(item["recommend_list"])
jsontype_extend["sort"] = "最新,最热,最赞"
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
for key in dy:
if key in jsontype_extend and jsontype_extend[key].strip() != "":
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in jsontype_extend:
if dkey in dy and jsontype_extend[dkey].strip() != "":
values = jsontype_extend[dkey].split(",")
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
value.strip() != ""]
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
result = {}
result["class"] = classes
result["filters"] = filters
result["list"] = homedata[1:]
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
"class": extend.get('class', '全部')}
result = {}
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
result["list"] = data["recommend_list"]
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
body = f"vod_id={ids[0]}"
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
vod = data["vod"]
play = []
names = []
for itt in data["vod_play_list"]:
a = []
names.append(itt["player_info"]["show"])
for it in itt['urls']:
it['user_agent'] = itt["player_info"].get("user_agent")
it["parse"] = itt["player_info"].get("parse")
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
play.append("#".join(a))
vod["vod_play_from"] = "$$$".join(names)
vod["vod_play_url"] = "$$$".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
body = f"keywords={key}&type_id=0&page={pg}"
data = self.getdata("/api.php/getappapi.index/searchList", body)
result = {"list": data["search_list"], "page": pg}
return result
def playerContent(self, flag, id, vipFlags):
ids = json.loads(self.d64(id))
h = {"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
try:
if re.search(r'url=', ids['parse_api_url']):
data = self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
url = data.get('url') or data['data'].get('url')
else:
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'], True))}&token={ids.get('token')}"
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
url = json.loads(b)['url']
if 'error' in url: raise ValueError(f"解析失败: {url}")
p = 0
except Exception as e:
print('错误信息:', e)
url, p = ids['url'], 1
if re.search(r'\.jpg|\.png|\.jpeg', url):
url = self.Mproxy(url)
result = {}
result["parse"] = p
result["url"] = url
result["header"] = h
return result
def localProxy(self, param):
return self.Mlocal(param)
def gethost(self):
headers = {
'User-Agent': 'okhttp/3.14.9'
}
response = self.fetch('https://miget-1313189639.cos.ap-guangzhou.myqcloud.com/mifun.txt',headers=headers).text
return self.host_late(response.split('\n'))
def host_late(self, url_list):
if isinstance(url_list, str):
urls = [u.strip() for u in url_list.split(',')]
else:
urls = url_list
if len(urls) <= 1:
return urls[0] if urls else ''
results = {}
threads = []
def test_host(url):
try:
url = url.strip()
start_time = time.time()
response = requests.head(url, timeout=1.0, allow_redirects=False)
delay = (time.time() - start_time) * 1000
results[url] = delay
except Exception as e:
results[url] = float('inf')
for url in urls:
t = threading.Thread(target=test_host, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return min(results.items(), key=lambda x: x[1])[0]
def getdid(self):
did=self.getCache('did')
if not did:
t = str(int(time.time()))
did = self.md5(t)
self.setCache('did', did)
return did
def aes(self, text, b=None):
key = b"GETMIFUNGEIMIFUN"
cipher = AES.new(key, AES.MODE_CBC, key)
if b:
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
ct = b64encode(ct_bytes).decode("utf-8")
return ct
else:
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
return pt.decode("utf-8")
def header(self):
t = str(int(time.time()))
header = {"Referer": self.host,
"User-Agent": "okhttp/3.14.9", "app-version-code": "516", "app-ui-mode": "light",
"app-api-verify-time": t, "app-user-device-id": self.did,
"app-api-verify-sign": self.aes(t, True),
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
return header
def getdata(self, path, data=None):
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
data1 = self.aes(vdata)
return json.loads(data1)
def Mproxy(self, url):
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
def Mlocal(self, param, header=None):
url = self.d64(param["url"])
ydata = self.fetch(url, headers=header, allow_redirects=False)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
data = self.fetch(url, headers=header).content.decode('utf-8')
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
lines = data.strip().split('\n')
for index, string in enumerate(lines):
if '#EXT' not in string and 'http' not in string:
last_slash_index = string.rfind('/')
lpath = string[:last_slash_index + 1]
lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()

263
PY/wawa.py Normal file
View File

@ -0,0 +1,263 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import sys
import time
import uuid
from base64 import b64decode, b64encode
from concurrent.futures import ThreadPoolExecutor, as_completed
from Crypto.Cipher import AES
from Crypto.Hash import SHA256, MD5
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Util.Padding import unpad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host, self.appKey, self.rsakey = self.userinfo()
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
data = self.fetch(f"{self.host}/api.php/zjv6.vod/types", headers=self.getheader()).json()
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序", }
filters = {}
classes = []
json_data = data['data']['list']
for item in json_data:
has_non_empty_field = False
jsontype_extend = item["type_extend"]
jsontype_extend['by'] = '按更新,按播放,按评分,按收藏'
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
for key in dy:
if key in jsontype_extend and jsontype_extend[key].strip() != "":
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in jsontype_extend:
if dkey in dy and jsontype_extend[dkey].strip() != "":
values = jsontype_extend[dkey].split(",")
sl = {'按更新': 'time', '按播放': 'hits', '按评分': 'score', '按收藏': 'store_num'}
value_array = [
{"n": value.strip(), "v": sl[value.strip()] if dkey == "by" else value.strip()}
for value in values
if value.strip() != ""
]
filters[str(item["type_id"])].append(
{"key": dkey, "name": dy[dkey], "value": value_array}
)
result = {"class": classes, "filters": filters}
return result
def homeVideoContent(self):
data = self.fetch(f"{self.host}/api.php/zjv6.vod/vodPhbAll", headers=self.getheader()).json()
return {'list': data['data']['list'][0]['vod_list']}
def categoryContent(self, tid, pg, filter, extend):
params = {
"type": tid,
"class": extend.get('class', ''),
"lang": extend.get('lang', ''),
"area": extend.get('area', ''),
"year": extend.get('year', ''),
"by": extend.get('by', ''),
"page": pg,
"limit": "12"
}
data = self.fetch(f"{self.host}/api.php/zjv6.vod", headers=self.getheader(), params=params).json()
result = {}
result['list'] = data['data']['list']
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data = self.fetch(f"{self.host}/api.php/zjv6.vod/detail?vod_id={ids[0]}&rel_limit=10",
headers=self.getheader()).json()
vod = data['data']
v, np = {'vod_play_from': [], 'vod_play_url': []}, {}
for i in vod['vod_play_list']:
n = i['player_info']['show']
np[n] = []
for j in i['urls']:
j['parse'] = i['player_info']['parse2']
nm = j.pop('name')
np[n].append(f"{nm}${self.e64(json.dumps(j))}")
for key, value in np.items():
v['vod_play_from'].append(key)
v['vod_play_url'].append('#'.join(value))
v['vod_play_from'] = '$$$'.join(v['vod_play_from'])
v['vod_play_url'] = '$$$'.join(v['vod_play_url'])
vod.update(v)
vod.pop('vod_play_list', None)
vod.pop('type', None)
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
data = self.fetch(f"{self.host}/api.php/zjv6.vod?page={pg}&limit=20&wd={key}", headers=self.getheader()).json()
return {'list': data['data']['list'], 'page': pg}
def playerContent(self, flag, id, vipFlags):
ids = json.loads(self.d64(id))
target_url = ids['url']
try:
parse_str = ids.get('parse', '')
if parse_str:
parse_urls = parse_str.split(',')
result_url = self.try_all_parses(parse_urls, target_url)
if result_url:
return {
'parse': 0,
'url': result_url,
'header': {'User-Agent': 'dart:io'}
}
return {
'parse': 1,
'url': target_url,
'header': {'User-Agent': 'dart:io'}
}
except Exception as e:
print(e)
return {
'parse': 1,
'url': target_url,
'header': {'User-Agent': 'dart:io'}
}
def liveContent(self, url):
pass
def localProxy(self, param):
pass
def userinfo(self):
t = str(int(time.time() * 1000))
uid = self.generate_uid()
sign = self.md5(f"appKey=3bbf7348cf314874883a18d6b6fcf67a&uid={uid}&time={t}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36',
'Connection': 'Keep-Alive',
'appKey': '3bbf7348cf314874883a18d6b6fcf67a',
'uid': uid,
'time': t,
'sign': sign,
}
params = {
'access_token': '74d5879931b9774be10dee3d8c51008e',
}
response = self.fetch('https://gitee.com/api/v5/repos/aycapp/openapi/contents/wawaconf.txt', params=params,
headers=headers).json()
data = json.loads(self.decrypt(response['content']))
return data['baseUrl'], data['appKey'], data['appSecret']
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def generate_uid(self):
return uuid.uuid4().hex
def getheader(self):
t = str(int(time.time() * 1000))
uid = self.generate_uid()
sign = self.sign_message(f"appKey={self.appKey}&time={t}&uid={uid}")
headers = {
'User-Agent': 'okhttp/4.9.3',
'Connection': 'Keep-Alive',
'uid': uid,
'time': t,
'appKey': self.appKey,
'sign': sign,
}
return headers
def decrypt(self, encrypted_data):
key = b64decode('Crm4FXWkk5JItpYirFDpqg==')
cipher = AES.new(key, AES.MODE_ECB)
encrypted = bytes.fromhex(self.d64(encrypted_data))
decrypted = cipher.decrypt(encrypted)
unpadded = unpad(decrypted, AES.block_size)
return unpadded.decode('utf-8')
def sign_message(self, message):
private_key_str = f"-----BEGIN PRIVATE KEY-----\n{self.rsakey}\n-----END PRIVATE KEY-----"
private_key = RSA.import_key(private_key_str)
message_hash = SHA256.new(message.encode('utf-8'))
signature = pkcs1_15.new(private_key).sign(message_hash)
signature_b64 = b64encode(signature).decode('utf-8')
return signature_b64
def fetch_url(self, parse_url, target_url):
try:
response = self.fetch(f"{parse_url.replace('..', '.')}{target_url}",
headers={"user-agent": "okhttp/4.1.0/luob.app"}, timeout=5)
if response.status_code == 200:
try:
data = response.json()
result_url = data.get('url') or data.get('data', {}).get('url')
if result_url:
return result_url
except:
pass
return None
except:
return None
def try_all_parses(self, parse_urls, target_url):
with ThreadPoolExecutor(max_workers=(len(parse_urls))) as executor:
future_to_url = {
executor.submit(self.fetch_url, parse_url.strip(), target_url): parse_url
for parse_url in parse_urls if parse_url.strip()
}
for future in as_completed(future_to_url):
try:
result = future.result()
if result:
return result
except:
continue
return None

128
PY/好帅短剧.py Normal file
View File

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import json
import sys
sys.path.append('..')
from base.spider import Spider
from pyquery import PyQuery as pq
class Spider(Spider):
def init(self, extend=""):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
host='https://www.nhsyy.com'
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'DNT': '1',
'Origin': host,
'Pragma': 'no-cache',
'Referer': f'{host}/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'User-Agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="130", "Google Chrome";v="130"',
'sec-ch-ua-mobile': '?1',
'sec-ch-ua-platform': '"Android"',
}
def homeContent(self, filter):
data = pq(self.fetch(self.host, headers=self.headers).text)
result = {}
classes = []
for i in data('.drop-content-items li').items():
j = i('a').attr('href')
if j and 'type' in j:
id = j.split('/')[-1].split('.')[0]
classes.append({
'type_name': i('a').text(),
'type_id': id
})
hlist = self.getlist(data('.module-lines-list .module-item'))
result['class'] = classes
result['list'] = hlist
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
data = self.fetch(f'{self.host}/vodshwo/{tid}--------{pg}---.html', headers=self.headers).text
vlist = self.getlist(pq(data)('.module-list .module-item'))
return {"list": vlist, "page": pg, "pagecount": 9999, "limit": 90, "total": 999999}
def detailContent(self, ids):
data = pq(self.fetch(f"{self.host}{ids[0]}", headers=self.headers).text)
udata = data('.scroll-box-y .scroll-content a')
vdata = data('.video-info-main .video-info-item')
vod = {
'vod_year': vdata.eq(2)('div').text(),
'vod_remarks': vdata.eq(3)('div').text(),
'vod_actor': vdata.eq(1)('a').text(),
'vod_director': vdata.eq(0)('a').text(),
'typt_name': data('.video-info-aux a').eq(0).attr('title'),
'vod_content': vdata.eq(4)('p').eq(-1).text(),
'vod_play_from': '嗷呜爱看短剧',
'vod_play_url': '#'.join([f"{i.text()}${i.attr('href')}" for i in udata.items()]),
}
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
dlist = self.fetch(f'{self.host}/vodsearch/{key}----------{pg}---.html', headers=self.headers).text
ldata = pq(dlist)('.module-list .module-search-item')
vlist = []
for i in ldata.items():
img = i('.module-item-pic')
vlist.append({
'vod_id': i('.video-serial').attr('href'),
'vod_name': img('img').attr('alt'),
'vod_pic': img('img').attr('data-src'),
'vod_year': i('.tag-link a').eq(0).text(),
'vod_remarks': i('.video-serial').text()
})
result = {"list": vlist, "page": pg}
return result
def playerContent(self, flag, id, vipFlags):
data=self.fetch(f"{self.host}{id}", headers=self.headers).text
jstr = pq(data)('.player-wrapper script').eq(0).text()
try:
jdata = json.loads(jstr.split('=', 1)[-1])
url = jdata.get('url') or jdata.get('next_url')
p=0
except:
url,p = f"{self.host}{id}",1
return {'parse': p, 'url': url, 'header': self.headers}
def localProxy(self, param):
pass
def getlist(self, data):
vlist = []
for i in data.items():
img = i('.module-item-pic')
vlist.append({
'vod_id': img('a').attr('href'),
'vod_name': img('img').attr('alt'),
'vod_pic': img('img').attr('data-src'),
'vod_remarks': i('.module-item-text').text()
})
return vlist

166
PY/爱瓜(代理).py Normal file
View File

@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
# by @嗷呜
# 温馨提示:搜索只能搜拼音联想
# 播放需要挂代理
import sys
import time
import uuid
from Crypto.Hash import MD5
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.uid = self.getuid()
self.token, self.code = self.getuserinfo()
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host = 'https://tvapi211.magicetech.com'
headers = {'User-Agent': 'okhttp/3.11.0'}
def homeContent(self, filter):
body = {'token': self.token, 'authcode': self.code}
data = self.post(f'{self.host}/hr_1_1_0/apptvapi/web/index.php/video/filter-header', json=self.getbody(body),
headers=self.headers).json()
result = {}
classes = []
filters = {}
for k in data['data']:
classes.append({
'type_name': k['channel_name'],
'type_id': str(k['channel_id']),
})
filters[str(k['channel_id'])] = []
for i in k['search_box']:
if len(i['list']):
filters[str(k['channel_id'])].append({
'key': i['field'],
'name': i['label'],
'value': [{'n': j['display'], 'v': str(j['value'])} for j in i['list'] if j['value']]
})
result['class'] = classes
result['filters'] = filters
return result
def homeVideoContent(self):
body = {'token': self.token, 'authcode': self.code}
data = self.post(f'{self.host}/hr_1_1_0/apptvapi/web/index.php/video/index-tv', json=self.getbody(body),
headers=self.headers).json()
return {'list': self.getlist(data['data'][0]['banner'])}
def categoryContent(self, tid, pg, filter, extend):
body = {'token': self.token, 'authcode': self.code, 'channel_id': tid, 'area': extend.get('area', '0'),
'year': extend.get('year', '0'), 'sort': extend.get('sort', '0'), 'tag': extend.get('tag', 'hot'),
'status': extend.get('status', '0'), 'page_num': pg, 'page_size': '24'}
data = self.post(f'{self.host}/hr_1_1_0/apptvapi/web/index.php/video/filter-video', json=self.getbody(body),
headers=self.headers).json()
result = {}
result['list'] = self.getlist(data['data']['list'])
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
ids = ids[0].split('@')
body = {'token': self.token, 'authcode': self.code, 'channel_id': ids[0], 'video_id': ids[1]}
data = self.post(f'{self.host}/hr_1_1_0/apptvapi/web/index.php/video/detail', json=self.getbody(body),
headers=self.headers).json()
vdata = {}
for k in data['data']['chapters']:
i = k['sourcelist']
for j in i:
if j['source_name'] not in vdata: vdata[j['source_name']] = []
vdata[j['source_name']].append(f"{k['title']}${j['source_url']}")
plist, names = [], []
for key, value in vdata.items():
names.append(key)
plist.append('#'.join(value))
vod = {
'vod_play_from': '$$$'.join(names),
'vod_play_url': '$$$'.join(plist),
}
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
body = {'token': self.token, 'authcode': self.code, 'keyword': key, 'page_num': pg}
data = self.post(f'{self.host}/hr_1_1_0/apptvapi/web/index.php/search/letter-result', json=self.getbody(body),
headers=self.headers).json()
return {'list': self.getlist(data['data']['list'])}
def playerContent(self, flag, id, vipFlags):
# https://rysp.tv
# https://aigua.tv
result = {
"parse": 0,
"url": id,
"header": {
"User-Agent": "Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)",
"Origin": "https://aigua.tv",
"Referer": "https://aigua.tv/"
}
}
return result
def localProxy(self, param):
pass
def getuserinfo(self):
data = self.post(f'{self.host}/hr_1_1_0/apptvapi/web/index.php/user/auth-login', json=self.getbody(),
headers=self.headers).json()
v = data['data']
return v['user_token'], v['authcode']
def getuid(self):
uid = self.getCache('uid')
if not uid:
uid = str(uuid.uuid4())
self.setCache('uid', uid)
return uid
def getbody(self, json_data=None):
if json_data is None: json_data = {}
params = {"product": "4", "ver": "1.1.0", "debug": "1", "appId": "1", "osType": "3", "marketChannel": "tv",
"sysVer": "11", "time": str(int(time.time())), "packageName": "com.gzsptv.gztvvideo",
"udid": self.uid, }
json_data.update(params)
sorted_json = dict(sorted(json_data.items(), key=lambda item: item[0]))
text = '&'.join(f"{k}={v}" for k, v in sorted_json.items() if v != '')
md5_hash = self.md5(f"jI7POOBbmiUZ0lmi{text}D9ShYdN51ksWptpkTu11yenAJu7Zu3cR").upper()
json_data.update({'sign': md5_hash})
return json_data
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def getlist(self, data):
videos = []
for i in data:
if type(i.get('video')) == dict: i = i['video']
videos.append({
'vod_id': f"{i.get('channel_id')}@{i.get('video_id')}",
'vod_name': i.get('video_name'),
'vod_pic': i.get('cover'),
'vod_year': i.get('score'),
'vod_remarks': i.get('flag'),
})
return videos