阿里云盘任务脚本(2025 年 2 月版)
const axios = require("axios")
refresh_token = config.aliyun.refresh_token
var token = ""
function aliyun() {
return new Promise(async (resolve) => {
try {
const url = 'https://auth.aliyundrive.com/v2/account/token';
const headers = {
"Content-Type": "application/json; charset=UTF-8",
};
const data = {"grant_type":"refresh_token",
"app_id":"pJZInNHN2dZWk8qg",
"refresh_token":`${refresh_token}`
};
let res = await axios.post(url, data, {headers});
if (res.data.code == 'InvalidParameter.RefreshToken' || res.data.code == 'RefreshTokenExpired') {
console.log(`token 刷新失败,${res.data.message}`);
msg = "token 刷新失败"
}
else{
const name = res.data.nick_name
token = res.data.access_token
// console.log(name,token)
await sign(token, name)
}
} catch (error) {
console.log(error);
msg = "token 接口请求失败";
}
resolve("【阿里云盘】:" + msg || "正常运行了");
})
}
async function sign(token, name) {
return new Promise(async (resolve) => {
try {
const url = 'https://member.aliyundrive.com/v1/activity/sign_in_list';
const headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/20D5024e iOS16.3 (iPhone15,2;zh-Hans-CN) App/4.1.3 AliApp(yunpan/4.1.3) com.alicloud.smartdrive/28278449 Channel/201200 AliApp(AYSD/4.1.3) com.alicloud.smartdrive/4.1.3 Version/16.3 Channel/201200 Language/zh-Hans-CN /iOS Mobile/iPhone15,2 language/zh-Hans-CN",
Authorization: "Bearer "+token
}
const data = {"isReward": false}
let res = await axios.post(url, data, {headers})
// const res = await axios.post('https://member.aliyundrive.com/v1/activity/sign_in_list', {
// isReward: false},{
// headers:{
// "Content-Type": "application/json",
// "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/20D5024e iOS16.3 (iPhone15,2;zh-Hans-CN) App/4.1.3 AliApp(yunpan/4.1.3) com.alicloud.smartdrive/28278449 Channel/201200 AliApp(AYSD/4.1.3) com.alicloud.smartdrive/4.1.3 Version/16.3 Channel/201200 Language/zh-Hans-CN /iOS Mobile/iPhone15,2 language/zh-Hans-CN",
// Authorization: "Bearer "+token
// }
// })
if (res.data.success) {
console.log(`${name},已连续签到${res.data.result.signInCount}天!`)
const day = res.data.result.signInCount
await reward(day)
msg = `${name},已连续签到${res.data.result.signInCount}天!`
}
else {
console.log(`${name},签到失败,${res.data.message}!`)
msg = "签到失败"
}
} catch (error) {
console.log(error)
msg = "签到接口请求失败"
}
resolve("【阿里云盘】:" + msg || "正常运行了")
});
}
async function reward(day){
return new Promise(async (resolve) => {
try {
const url = 'https://member.aliyundrive.com/v1/activity/sign_in_reward';
const headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/20D5024e iOS16.3 (iPhone15,2;zh-Hans-CN) App/4.1.3 AliApp(yunpan/4.1.3) com.alicloud.smartdrive/28278449 Channel/201200 AliApp(AYSD/4.1.3) com.alicloud.smartdrive/4.1.3 Version/16.3 Channel/201200 Language/zh-Hans-CN /iOS Mobile/iPhone15,2 language/zh-Hans-CN",
Authorization: "Bearer "+token
}
const data = {"signInDay":`${day}`}
let res = await axios.post(url, data, {headers})
if (res.data.success) {
console.log(`奖励:${res.data.result.name},${res.data.result.description},${res.data.result.notice}!`);
msg = `奖励:${res.data.result.name},${res.data.result.description},${res.data.result.notice}!`
}
else {
console.log(`奖励获取失败:${res.data.message}!`);
msg = "奖励获取失败"
}
} catch (error){
console.log(error);
msg = "奖励接口请求失败";
}
resolve("【阿里云盘】:" + msg || "正常运行了")
})
}
module.exports = aliyun;
解析
脚本是为阿里云盘设计的自动化签到和奖励获取工具。它通过 axios 库发送 HTTP 请求以自动刷新令牌、签到和获取签到奖励。
aliyun()
功能:主函数,用于刷新阿里云盘的访问令牌,并在成功后尝试签到。流程:
通过 POST 请求发送到阿里云的认证服务器以刷新访问令牌。
根据响应判断令牌是否成功刷新。若刷新失败,可能是因为令牌过期或参数错误。
若刷新成功,调用 sign()函数进行签到。
sign(token, name)
功能:使用刷新后的令牌进行签到操作。流程:
发送签到请求到阿里云盘的签到 API。
根据 API 返回的成功或失败状态,打印相应的签到信息或错误信息。
如果签到成功,调用 reward()函数获取签到奖励。
reward(day)
功能:根据签到天数,请求签到奖励。流程:
发送请求到阿里云盘的签到奖励 API。
根据返回的结果,打印奖励信息或错误信息。
错误处理
每个函数都包括 try-catch 结构,以处理可能发生的 API 请求错误,如网络问题或服务端错误。消息处理
每个步骤中,相关的操作结果通过控制台日志输出,并通过变量 msg 构建最终用户可见的消息。这些消息通过 Promise 的 resolve 返回。异步处理
所有网络请求均采用异步方式,通过 async 和 await 确保请求按顺序执行,且在进入下一步之前完成。使用场景
这个脚本适用于自动化阿里云盘的日常任务,如自动签到和获取奖励,特别适用于希望自动化这些重复性任务的用户。模块导出
aliyun 函数通过 module.exports 导出,允许在其他 Node.js 环境中导入和使用这个函数。
阿里云盘任务脚本(2025 年 1 月版)
import json
import os
import requests
import urllib3
from dailycheckin import CheckIn
urllib3.disable_warnings()
class AliYun(CheckIn):
name = "阿里云盘"
def __init__(self, check_item: dict):
self.check_item = check_item
def update_token(self, refresh_token):
url = "https://auth.aliyundrive.com/v2/account/token"
data = {"grant_type": "refresh_token", "refresh_token": refresh_token}
response = requests.post(url=url, json=data).json()
access_token = response.get("access_token")
return access_token
def sign(self, access_token):
url = "https://member.aliyundrive.com/v1/activity/sign_in_list"
headers = {"Authorization": access_token, "Content-Type": "application/json"}
result = requests.post(url=url, headers=headers, json={}).json()
sign_days = result["result"]["signInCount"]
data = {"signInDay": sign_days}
url_reward = "https://member.aliyundrive.com/v1/activity/sign_in_reward"
requests.post(url=url_reward, headers=headers, data=json.dumps(data))
if "success" in result:
print("签到成功")
for i, j in enumerate(result["result"]["signInLogs"]):
if j["status"] == "miss":
day_json = result["result"]["signInLogs"][i - 1]
if not day_json["isReward"]:
msg = [
{
"name": "阿里云盘",
"value": "签到成功,今日未获得奖励",
}
]
else:
msg = [
{
"name": "累计签到",
"value": result["result"]["signInCount"],
},
{
"name": "阿里云盘",
"value": "获得奖励:{}{}".format(
day_json["reward"]["name"],
day_json["reward"]["description"],
),
},
]
return msg
def main(self):
refresh_token = self.check_item.get("refresh_token")
access_token = self.update_token(refresh_token)
if not access_token:
return [{"name": "阿里云盘", "value": "token 过期"}]
msg = self.sign(access_token)
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("ALIYUN", [])[0]
print(AliYun(check_item=_check_item).main())
解析:
这个脚本是通过 阿里云盘 的 API 自动进行签到操作,并返回签到结果和奖励情况。继承自 CheckIn 类,用于每日自动签到并输出签到结果。
主要方法
update_token(refresh_token):该方法使用传入的 refresh_token 获取新的 access_token,用于后续的 API 请求。
sign(access_token):
使用获取到的 access_token 进行签到,返回签到结果。
如果当日未获得奖励,会显示“今日未获得奖励”,否则会显示累计签到天数和奖励详情。
main():
该方法是脚本的入口,获取配置中的 refresh_token,然后更新 access_token,并调用 sign 方法进行签到。
最后格式化输出签到信息。
该脚本自动更新 access_token,然后使用该令牌在 阿里云盘 完成签到任务,并返回签到的状态信息和奖励详情。如果 access_token 过期,则返回“token 过期”的提示。
有道云笔记任务脚本
import json
import os
import requests
from dailycheckin import CheckIn
class YouDao(CheckIn):
name = "有道云笔记"
def __init__(self, check_item):
self.check_item = check_item
@staticmethod
def sign(cookies):
ad_space = 0
refresh_cookies_res = requests.get(
"http://note.youdao.com/login/acc/pe/getsess?product=YNOTE", cookies=cookies
)
cookies = dict(refresh_cookies_res.cookies)
url = "https://note.youdao.com/yws/api/daupromotion?method=sync"
res = requests.post(url=url, cookies=cookies)
if "error" not in res.text:
checkin_response = requests.post(
url="https://note.youdao.com/yws/mapi/user?method=checkin",
cookies=cookies,
)
for i in range(3):
ad_response = requests.post(
url="https://note.youdao.com/yws/mapi/user?method=adRandomPrompt",
cookies=cookies,
)
ad_space += ad_response.json().get("space", 0) // 1048576
if "reward" in res.text:
sync_space = res.json().get("rewardSpace", 0) // 1048576
checkin_space = checkin_response.json().get("space", 0) // 1048576
space = sync_space + checkin_space + ad_space
youdao_message = f"+{space}M"
else:
youdao_message = "获取失败"
else:
youdao_message = "Cookie 可能过期"
return youdao_message
def main(self):
youdao_cookie = {
item.split("=")[0]: item.split("=")[1]
for item in self.check_item.get("cookie").split("; ")
}
try:
ynote_pers = youdao_cookie.get("YNOTE_PERS", "")
uid = ynote_pers.split("||")[-2]
except Exception as e:
print(f"获取账号信息失败: {e}")
uid = "未获取到账号信息"
msg = self.sign(cookies=youdao_cookie)
msg = [
{"name": "帐号信息", "value": uid},
{"name": "获取空间", "value": msg},
]
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("YOUDAO", [])[0]
print(YouDao(check_item=_check_item).main())
解析
这脚本实现了一个有道云笔记的签到功能,并获取相应的空间奖励。它继承自 CheckIn 类,专门用来处理各类平台的签到操作。
YouDao 类:这是继承自 CheckIn 的子类,主要用于处理有道云笔记的签到。
name = “有道云笔记”:定义了该类的名称。
__init__(self, check_item):构造函数,用于初始化 check_item,这个 check_item 包含了用户的 cookie 等信息。
sign 方法:
cookies:方法的参数是一个字典类型的 cookies,通常是登录成功后获取的 cookies。
获取 Session:首先通过 requests.get 获取一个 session,并更新 cookies。
签到操作:通过 requests.post 向有道云笔记的 API 接口发送请求,进行签到操作。
获取奖励:签到成功后,系统会奖励一定的空间,并会尝试获取广告空间。每次请求广告接口会返回一定的“空间”,通过累加获得广告空间的总量。
奖励空间计算:最终计算出总的空间奖励,并通过字符串返回。空间的单位为 MB(通过 1048576 除以字节数转换为 MB)。
main 方法:
从 check_item 中获取 cookie 信息,并解析出 YNOTE_PERS,进一步提取出 uid(用户 ID)。
调用 sign 方法执行签到操作并获取奖励空间。
最终构造一个包含用户信息和奖励信息的消息,并返回该消息。
程序运行:
在 __main__ 部分,首先从配置文件 config.json 中读取配置项(可能包括有道云笔记的 cookie 等信息),然后执行 YouDao 类的 main()方法,打印出签到结果。
运行逻辑:
获取 cookies:main 方法会从配置文件读取用户的 cookie 信息,然后通过这些 cookies 执行签到。签到及奖励:调用 sign 方法,进行签到并累加空间奖励(广告奖励和签到奖励)。
返回信息:通过 msg 返回用户信息和空间奖励信息。
配置文件(config.json)示例:
{
“YOUDAO”: [
{
“cookie”: “YNOTE_PERS=xxxx; another_cookie=yyyy”
}
]
}输出结果
执行 main()方法后,打印的输出类似于:
帐号信息: 用户 ID
获取空间: +100M
错误处理:
如果获取 uid 时出错,会返回”未获取到账号信息”。如果 cookie 过期或接口调用失败,会返回”Cookie 可能过期”或”获取失败”。
哔哩哔哩任务脚本
import json
import os
import time
import requests
from dailycheckin import CheckIn
class BiliBili(CheckIn):
name = "Bilibili"
def __init__(self, check_item: dict):
self.check_item = check_item
@staticmethod
def get_nav(session):
url = "https://api.bilibili.com/x/web-interface/nav"
ret = session.get(url=url).json()
uname = ret.get("data", {}).get("uname")
uid = ret.get("data", {}).get("mid")
is_login = ret.get("data", {}).get("isLogin")
coin = ret.get("data", {}).get("money")
vip_type = ret.get("data", {}).get("vipType")
current_exp = ret.get("data", {}).get("level_info", {}).get("current_exp")
return uname, uid, is_login, coin, vip_type, current_exp
@staticmethod
def get_today_exp(session: requests.Session) -> list:
"""GET 获取今日经验信息
:param requests.Session session:
:return list: 今日经验信息列表
"""
url = "https://api.bilibili.com/x/member/web/exp/log?jsonp=jsonp"
today = time.strftime("%Y-%m-%d", time.localtime())
return list(
filter(
lambda x: x["time"].split()[0] == today,
session.get(url=url).json().get("data").get("list"),
)
)
@staticmethod
def vip_privilege_my(session) -> dict:
"""取 B 站大会员硬币经验信息"""
url = "https://api.bilibili.com/x/vip/privilege/my"
ret = session.get(url=url).json()
return ret
@staticmethod
def reward(session) -> dict:
"""取 B 站经验信息"""
url = "https://api.bilibili.com/x/member/web/exp/log?jsonp=jsonp"
today = time.strftime("%Y-%m-%d", time.localtime())
return list(
filter(
lambda x: x["time"].split()[0] == today,
session.get(url=url).json().get("data").get("list"),
)
)
@staticmethod
def live_sign(session) -> dict:
"""B 站直播签到"""
try:
url = "https://api.live.bilibili.com/xlive/web-ucenter/v1/sign/DoSign"
ret = session.get(url=url).json()
if ret["code"] == 0:
msg = f'签到成功,{ret["data"]["text"]},特别信息:{ret["data"]["specialText"]},本月已签到{ret["data"]["hadSignDays"]}天'
elif ret["code"] == 1011040:
msg = "今日已签到过,无法重复签到"
else:
msg = f'签到失败,信息为: {ret["message"]}'
except Exception as e:
msg = f"签到异常,原因为{str(e)}"
print(msg)
return msg
@staticmethod
def manga_sign(session, platform="android") -> dict:
"""
模拟 B 站漫画客户端签到
"""
try:
url = "https://manga.bilibili.com/twirp/activity.v1.Activity/ClockIn"
post_data = {"platform": platform}
ret = session.post(url=url, data=post_data).json()
if ret["code"] == 0:
msg = "签到成功"
elif ret["msg"] == "clockin clockin is duplicate":
msg = "今天已经签到过了"
else:
msg = f'签到失败,信息为({ret["msg"]})'
print(msg)
except Exception as e:
msg = f"签到异常,原因为: {str(e)}"
print(msg)
return msg
@staticmethod
def vip_privilege_receive(session, bili_jct, receive_type: int = 1) -> dict:
"""
领取 B 站大会员权益
receive_type int 权益类型,1 为 B 币劵,2 为优惠券
"""
url = "https://api.bilibili.com/x/vip/privilege/receive"
post_data = {"type": receive_type, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def vip_manga_reward(session) -> dict:
"""获取漫画大会员福利"""
url = "https://manga.bilibili.com/twirp/user.v1.User/GetVipReward"
ret = session.post(url=url, json={"reason_id": 1}).json()
return ret
@staticmethod
def report_task(session, bili_jct, aid: int, cid: int, progres: int = 300) -> dict:
"""
B 站上报视频观看进度
aid int 视频 av 号
cid int 视频 cid 号
progres int 观看秒数
"""
url = "http://api.bilibili.com/x/v2/history/report"
post_data = {"aid": aid, "cid": cid, "progres": progres, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def share_task(session, bili_jct, aid) -> dict:
"""
分享指定 av 号视频
aid int 视频 av 号
"""
url = "https://api.bilibili.com/x/web-interface/share/add"
post_data = {"aid": aid, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def get_followings(
session,
uid: int,
pn: int = 1,
ps: int = 50,
order: str = "desc",
order_type: str = "attention",
) -> dict:
"""
获取指定用户关注的 up 主
uid int 账户 uid,默认为本账户,非登录账户只能获取 20 个*5 页
pn int 页码,默认第一页
ps int 每页数量,默认 50
order str 排序方式,默认 desc
order_type 排序类型,默认 attention
"""
params = {
"vmid": uid,
"pn": pn,
"ps": ps,
"order": order,
"order_type": order_type,
}
url = "https://api.bilibili.com/x/relation/followings"
ret = session.get(url=url, params=params).json()
return ret
@staticmethod
def space_arc_search(
session,
uid: int,
pn: int = 1,
ps: int = 30,
tid: int = 0,
order: str = "pubdate",
keyword: str = "",
) -> dict:
"""
获取指定 up 主空间视频投稿信息
uid int 账户 uid,默认为本账户
pn int 页码,默认第一页
ps int 每页数量,默认 50
tid int 分区 默认为 0(所有分区)
order str 排序方式,默认 pubdate
keyword str 关键字,默认为空
"""
params = {
"mid": uid,
"pn": pn,
"Ps": ps,
"tid": tid,
"order": order,
"keyword": keyword,
}
url = "https://api.bilibili.com/x/space/arc/search"
ret = session.get(url=url, params=params).json()
count = 2
data_list = [
{
"aid": one.get("aid"),
"cid": 0,
"title": one.get("title"),
"owner": one.get("author"),
}
for one in ret.get("data", {}).get("list", {}).get("vlist", [])[:count]
]
return data_list, count
@staticmethod
def elec_pay(session, bili_jct, uid: int, num: int = 50) -> dict:
"""
用 B 币给 up 主充电
uid int up 主 uid
num int 充电电池数量
"""
url = "https://api.bilibili.com/x/ugcpay/trade/elec/pay/quick"
post_data = {
"elec_num": num,
"up_mid": uid,
"otype": "up",
"oid": uid,
"csrf": bili_jct,
}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def coin_add(
session, bili_jct, aid: int, num: int = 1, select_like: int = 1
) -> dict:
"""
给指定 av 号视频投币
aid int 视频 av 号
num int 投币数量
select_like int 是否点赞
"""
url = "https://api.bilibili.com/x/web-interface/coin/add"
post_data = {
"aid": aid,
"multiply": num,
"select_like": select_like,
"cross_domain": "true",
"csrf": bili_jct,
}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def live_status(session) -> dict:
"""B 站直播获取金银瓜子状态"""
url = "https://api.live.bilibili.com/pay/v1/Exchange/getStatus"
ret = session.get(url=url).json()
data = ret.get("data")
silver = data.get("silver", 0)
gold = data.get("gold", 0)
coin = data.get("coin", 0)
msg = [
{"name": "硬币数量", "value": coin},
{"name": "金瓜子数", "value": gold},
{"name": "银瓜子数", "value": silver},
]
return msg
@staticmethod
def get_region(session, rid=1, num=6) -> dict:
"""
获取 B 站分区视频信息
rid int 分区号
num int 获取视频数量
"""
url = (
"https://api.bilibili.com/x/web-interface/dynamic/region?ps="
+ str(num)
+ "&rid="
+ str(rid)
)
ret = session.get(url=url).json()
data_list = [
{
"aid": one.get("aid"),
"cid": one.get("cid"),
"title": one.get("title"),
"owner": one.get("owner", {}).get("name"),
}
for one in ret.get("data", {}).get("archives", [])
]
return data_list
@staticmethod
def silver2coin(session, bili_jct) -> dict:
"""B 站银瓜子换硬币"""
url = "https://api.live.bilibili.com/xlive/revenue/v1/wallet/silver2coin"
post_data = {"csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
def main(self):
bilibili_cookie = {
item.split("=")[0]: item.split("=")[1]
for item in self.check_item.get("cookie").split("; ")
}
bili_jct = bilibili_cookie.get("bili_jct")
coin_num = self.check_item.get("coin_num", 0)
coin_type = self.check_item.get("coin_type", 1)
silver2coin = self.check_item.get("silver2coin", False)
session = requests.session()
requests.utils.add_dict_to_cookiejar(session.cookies, bilibili_cookie)
session.headers.update(
{
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.64",
"Referer": "https://www.bilibili.com/",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Connection": "keep-alive",
}
)
success_count = 0
uname, uid, is_login, coin, vip_type, current_exp = self.get_nav(
session=session
)
if is_login:
manhua_msg = self.manga_sign(session=session)
live_msg = self.live_sign(session=session)
aid_list = self.get_region(session=session)
vip_privilege_my_ret = self.vip_privilege_my(session=session)
welfare_list = vip_privilege_my_ret.get("data", {}).get("list", [])
for welfare in welfare_list:
if welfare.get("state") == 0 and welfare.get("vip_type") == vip_type:
vip_privilege_receive_ret = self.vip_privilege_receive(
session=session,
bili_jct=bili_jct,
receive_type=welfare.get("type"),
)
print(vip_privilege_receive_ret)
coins_av_count = len(
list(
filter(
lambda x: x["reason"] == "视频投币奖励",
self.get_today_exp(session=session),
)
)
)
coin_num = coin_num - coins_av_count
coin_num = coin_num if coin_num < coin else coin
if coin_type == 1:
following_list = self.get_followings(session=session, uid=uid)
count = 0
for following in following_list.get("data", {}).get("list"):
mid = following.get("mid")
if mid:
tmplist, tmpcount = self.space_arc_search(
session=session, uid=mid
)
aid_list += tmplist
count += tmpcount
if count > coin_num:
print("已获取足够关注用户的视频")
break
else:
aid_list += self.get_region(session=session)
for one in aid_list[::-1]:
print(one)
if coin_num > 0:
for aid in aid_list[::-1]:
ret = self.coin_add(
session=session, aid=aid.get("aid"), bili_jct=bili_jct
)
if ret["code"] == 0:
coin_num -= 1
print(f'成功给{aid.get("title")}投一个币')
success_count += 1
elif ret["code"] == 34005:
print(f'投币{aid.get("title")}失败,原因为{ret["message"]}')
continue
# -104 硬币不够了 -111 csrf 失败 34005 投币达到上限
else:
print(f'投币{aid.get("title")}失败,原因为{ret["message"]},跳过投币')
break
if coin_num <= 0:
break
coin_msg = f"今日成功投币{success_count + coins_av_count}/{self.check_item.get('coin_num', 5)}个"
else:
coin_msg = (
f"今日成功投币{coins_av_count}/{self.check_item.get('coin_num', 5)}个"
)
aid = aid_list[0].get("aid")
cid = aid_list[0].get("cid")
title = aid_list[0].get("title")
report_ret = self.report_task(
session=session, bili_jct=bili_jct, aid=aid, cid=cid
)
if report_ret.get("code") == 0:
report_msg = f"观看《{title}》300 秒"
else:
report_msg = "任务失败"
share_ret = self.share_task(session=session, bili_jct=bili_jct, aid=aid)
if share_ret.get("code") == 0:
share_msg = f"分享《{title}》成功"
else:
share_msg = "分享失败"
print(share_msg)
s2c_msg = "不兑换硬币"
if silver2coin:
silver2coin_ret = self.silver2coin(session=session, bili_jct=bili_jct)
s2c_msg = silver2coin_ret["message"]
if silver2coin_ret["code"] != 0:
print(s2c_msg)
else:
s2c_msg = ""
live_stats = self.live_status(session=session)
uname, uid, is_login, new_coin, vip_type, new_current_exp = self.get_nav(
session=session
)
today_exp = sum(
map(lambda x: x["delta"], self.get_today_exp(session=session))
)
update_data = (28800 - new_current_exp) // (today_exp if today_exp else 1)
msg = [
{"name": "帐号信息", "value": uname},
{"name": "漫画签到", "value": manhua_msg},
{"name": "直播签到", "value": live_msg},
{"name": "登陆任务", "value": "今日已登陆"},
{"name": "观看视频", "value": report_msg},
{"name": "分享任务", "value": share_msg},
{"name": "瓜子兑换", "value": s2c_msg},
{"name": "投币任务", "value": coin_msg},
{"name": "今日经验", "value": today_exp},
{"name": "当前经验", "value": new_current_exp},
{"name": "升级还需", "value": f"{update_data}天"},
] + live_stats
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("BILIBILI", [])[0]
print(BiliBili(check_item=_check_item).main())
解析
这脚本是用于自动化执行 Bilibili(哔哩哔哩)网站的一些任务,包括签到、投币、观看视频等。它通过与 Bilibili 的 API 接口进行交互来完成各种任务,并返回执行结果。
主要功能
get_nav(session):获取当前登录用户的基本信息,包括用户名、UID、登录状态、硬币数、VIP 类型和当前经验值等。
get_today_exp(session):
获取当天的经验信息,包括用户参与的各种活动(如签到、投币等)所获得的经验。
live_sign(session):
执行 Bilibili 直播签到,并返回签到成功的消息。
manga_sign(session):
执行 Bilibili 漫画客户端签到,并返回签到的结果。
vip_privilege_receive(session, bili_jct, receive_type):
领取 Bilibili 大会员的权益,如 B 币券或优惠券。
coin_add(session, bili_jct, aid, num):
给指定的 AV 号视频投币,可以投多个硬币。
report_task(session, bili_jct, aid, cid, progres):
上报视频观看进度,帮助完成观看视频的任务。
share_task(session, bili_jct, aid):
分享指定的 AV 号视频。
get_region(session, rid, num):
获取 Bilibili 分区的最新视频信息。
silver2coin(session, bili_jct):
将 Bilibili 的银瓜子兑换成硬币。
main():
该方法是执行任务的主入口。根据配置文件中的设定,它将执行多个操作,如签到、投币、观看视频、分享等。
任务完成后,会汇总所有任务的执行情况,返回一个包含用户信息和任务执行情况的消息。
流程:
读取配置文件获取必要的信息。执行多个任务(签到、投币、观看视频等),并收集执行结果。
汇总结果并返回一个包含所有任务状态的消息。
细节:
Cookie 管理:代码使用 requests 会话来模拟登录状态,保持会话中的 cookies 信息。错误处理:对于 API 请求和任务执行中的异常,代码进行了适当的错误处理,确保出现问题时能给出清晰的错误提示。
任务顺序:代码会先执行签到、投币等任务,并根据配置执行剩余的任务(如视频分享、领取大会员福利等)。
这个脚本是一个较为完整的 Bilibili 自动化任务脚本,通过与 Bilibili API 的交互,实现了签到、投币、观看视频、分享视频等一系列常见任务的自动化,适合需要每天自动完成这些任务的用户。