Update 4k影院.py

This commit is contained in:
maoystv 2025-04-29 21:24:27 +08:00 committed by GitHub
parent d11e790df3
commit 997be1af11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,32 +1,17 @@
# 作者:老王叔叔 # -*- coding: utf-8 -*-
# by @嗷呜
import requests
from bs4 import BeautifulSoup
import re
from base.spider import Spider
import sys import sys
import json from pyquery import PyQuery as pq
import base64
import urllib.parse
sys.path.append('..') sys.path.append('..')
from base.spider import Spider
xurl = "https://www.4kvm.net"
headerx = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0'}
pm = ''
class Spider(Spider): class Spider(Spider):
global xurl
global headerx def init(self, extend=""):
pass
def getName(self): def getName(self):
return "首页"
def init(self, extend):
pass pass
def isVideoFormat(self, url): def isVideoFormat(self, url):
@ -35,157 +20,40 @@ class Spider(Spider):
def manualVideoCheck(self): def manualVideoCheck(self):
pass pass
def extract_middle_text(self, text, start_str, end_str, pl, start_index1: str = '', end_index2: str = ''): def destroy(self):
if pl == 3: pass
plx = []
while True:
start_index = text.find(start_str)
if start_index == -1:
break
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
break
middle_text = text[start_index + len(start_str):end_index]
plx.append(middle_text)
text = text.replace(start_str + middle_text + end_str, '')
if len(plx) > 0:
purl = ''
for i in range(len(plx)):
matches = re.findall(start_index1, plx[i])
output = ""
for match in matches:
match3 = re.search(r'(?:^|[^0-9])(\d+)(?:[^0-9]|$)', match[1])
if match3:
number = match3.group(1)
else:
number = 0
if 'http' not in match[0]:
output += f"#{match[1]}${number}{xurl}{match[0]}"
else:
output += f"#{match[1]}${number}{match[0]}"
output = output[1:]
purl = purl + output + "$$$"
purl = purl[:-3]
return purl
else:
return ""
else:
start_index = text.find(start_str)
if start_index == -1:
return ""
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
return ""
if pl == 0: headers = {
middle_text = text[start_index + len(start_str):end_index] 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
return middle_text.replace("\\", "") 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'document',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
}
if pl == 1: host = "https://4k-av.com"
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
jg = ' '.join(matches)
return jg
if pl == 2:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
new_list = [f'{item}' for item in matches]
jg = '$$$'.join(new_list)
return jg
def homeContent(self, filter): def homeContent(self, filter):
data=self.getpq()
result = {} result = {}
result = {"class": [{"type_id": "movies", "type_name": "电影"}, classes = []
{"type_id": "tvshows", "type_name": "剧集"}, for k in list(data('#category ul li').items())[:-1]:
{"type_id": "trending", "type_name": "热门"}]} classes.append({
'type_name': k.text(),
'type_id': k('a').attr('href')
})
result['class'] = classes
result['list'] = self.getlist(data('#MainContent_scrollul ul li'),'.poster span')
return result return result
def homeVideoContent(self): def homeVideoContent(self):
videos = [] pass
try:
detail = requests.get(url=xurl, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
soups = doc.find_all('div', class_="items") def categoryContent(self, tid, pg, filter, extend):
data=self.getpq(f"{tid}page-{pg}.html")
for soup in soups:
vods = soup.find_all('article', class_="item")
for vod in vods:
names = vod.find('div', class_="poster")
name = names.find('img')['alt']
ids = vod.find('div', class_="poster")
id = ids.find('a')['href']
pics = vod.find('div', class_="poster")
pic = pics.find('img')['src']
remark = self.extract_middle_text(str(vod), '<div class="upinfo">', '</div>', 0)
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
result = {'list': videos}
return result
except:
pass
def categoryContent(self, cid, pg, filter, ext):
result = {} result = {}
if pg: result['list'] = self.getlist(data('#MainContent_newestlist .virow .NTMitem'))
page = int(pg)
else:
page = 1
page = int(pg)
videos = []
if page == '1':
url = f'{xurl}/{cid}'
else:
url = f'{xurl}/{cid}/page/{str(page)}'
try:
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
soups = doc.find_all('div', class_="poster")
for vod in soups:
name = vod.find('img')['alt']
id = vod.find('a')['href']
pic = vod.find('img')['src']
remark = self.extract_middle_text(str(vod), 'class="update">', '</div>', 0)
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
except:
pass
result = {'list': videos}
result['page'] = pg result['page'] = pg
result['pagecount'] = 9999 result['pagecount'] = 9999
result['limit'] = 90 result['limit'] = 90
@ -193,135 +61,71 @@ class Spider(Spider):
return result return result
def detailContent(self, ids): def detailContent(self, ids):
global pm data=self.getpq(ids[0])
did = ids[0] v=data('#videoinfo')
result = {} vod = {
videos = [] 'type_name': v('#MainContent_tags.tags a').text(),
playurl = '' 'vod_year': v('#MainContent_videodetail.videodetail a').text(),
if 'http' not in did: 'vod_remarks': v('#MainContent_titleh12 h2').text(),
did = xurl + did 'vod_content': v('p.cnline').text(),
'vod_play_from': '4KAV',
'vod_play_url': ''
}
vlist=data('#rtlist li')
if vlist:
c=[f"{i('span').text()}${i('a').attr('href')}" for i in list(vlist.items())[1:]]
c.insert(0,f"{vlist.eq(0)('span').text()}${ids[0]}")
vod['vod_play_url'] = '#'.join(c)
else:
vod['vod_play_url'] = f"{data('#tophead h1').text()}${ids[0]}"
return {'list':[vod]}
if 'tvshows' in did or 'trending' in did: def searchContent(self, key, quick, pg="1"):
tiaozhuan = '1' data=self.getpq(f"/s?k={key}")
elif 'movies' in did: return {'list':self.getlist(data('#MainContent_newestlist .virow.search .NTMitem.Main'))}
tiaozhuan = '2'
if tiaozhuan == '1':
res1 = requests.get(url=did, headers=headerx)
res1.encoding = "utf-8"
res = res1.text
did = self.extract_middle_text(res, '<h2>播放列表</h2>', '</div>', 1, 'href="(.*?)"')
if 'http' not in did:
did = xurl + did
res1 = requests.get(url=did, headers=headerx)
res1.encoding = "utf-8"
res = res1.text
bofang = self.extract_middle_text(res, "videourls:[", "],", 0, )
id = self.extract_middle_text(res, "t single single-seasons postid-", '"', 0, )
array = json.loads(bofang)
purl = ''
for vod in array:
name = vod['name']
js = vod['url']
purl = purl + str(name) + '$' + xurl + '/artplayer?id=' + id + '&source=0&ep=' + str(js) + '#'
purl = purl[:-1]
if tiaozhuan == '2':
res1 = requests.get(url=did, headers=headerx)
res1.encoding = "utf-8"
res = res1.text
id = self.extract_middle_text(res, "single single-movies postid-", '"', 0, )
name = self.extract_middle_text(res, "</i><span class='title'>", '</span>', 0, )
purl = ''
purl = purl + str(name) + '$' + xurl + '/artplayer?mvsource=0&id=' + id + '&type=hls'
content = self.extract_middle_text(res, '<p>', '</p>',0)
videos.append({
"vod_id": did,
"vod_actor": '',
"vod_director": '',
"vod_content": content,
"vod_play_from": '專線',
"vod_play_url": purl
})
result['list'] = videos
return result
def playerContent(self, flag, id, vipFlags): def playerContent(self, flag, id, vipFlags):
parts = id.split("http") try:
xiutan = 0 data=self.getpq(id)
if xiutan == 0: p,url=0,data('#MainContent_videowindow source').attr('src')
if len(parts) > 1: if not url:raise Exception("未找到播放地址")
before_https, after_https = parts[0], 'http' + parts[1] except Exception as e:
p,url=1,f"{self.host}{id}"
headers = {
'origin': self.host,
'referer': f'{self.host}/',
'sec-ch-ua-platform': '"macOS"',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
}
return {'parse': p, 'url': url, 'header': headers}
res = requests.get(url=after_https, headers=headerx) def localProxy(self, param):
res = res.text pass
url = self.extract_middle_text(res, 'fetch("', '"', 0).replace('\\', '') def liveContent(self, url):
pass
result = {} def getlist(self,data,y='.resyear label[title="分辨率"]'):
result["parse"] = xiutan
result["playUrl"] = ''
result["url"] = url
result["header"] = headerx
return result
def searchContentPage(self, key, quick, page):
result = {}
videos = [] videos = []
if not page: for i in data.items():
page = '1' ns = i('.title h2').text().split(' ')
if page == '1': videos.append({
url = f'{xurl}/xssearch?s={key}' 'vod_id': i('.title a').attr('href'),
'vod_name': ns[0],
'vod_pic': i('.poster img').attr('src'),
'vod_remarks': ns[-1] if len(ns) > 1 else '',
'vod_year': i(y).text()
})
return videos
else: def getpq(self, path=''):
url = f'{xurl}/xssearch?s={key}' url=f"{self.host}{path}"
data=self.fetch(url,headers=self.headers).text
detail = requests.get(url=url, headers=headerx) try:
detail.encoding = "utf-8" return pq(data)
res = detail.text except Exception as e:
doc = BeautifulSoup(res, "lxml") print(f"{str(e)}")
return pq(data.encode('utf-8'))
soups = doc.find_all('div', class_="result-item")
for vod in soups:
names = vod.find('div', class_="title")
name = names.find('a').text
id = vod.find('a')['href']
pic = vod.find('img')['src']
remark = self.extract_middle_text(str(vod), 'class="rating">', '</span>', 0)
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
result['list'] = videos
result['page'] = page
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def searchContent(self, key, quick):
return self.searchContentPage(key, quick, '1')
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None