腾讯云函数实现今日校园自动填写

说明

学习交流使用

工具

腾讯云函数

注意

1 附件里面的config.yml只有HFUT可以用,其他学校需要用generate.py来进行配置
2 少部分学校不支持云函数

使用步骤

  1. 配置config.yml中对应的学号(username)和密码(password)还有地址(address)等等信息,详情请看config.yml中的注释说明,注意这里的学号和密码都是今日校园的学号和密码
  2. 打开百度搜索腾讯云函数,注册认证后
    1. 进入控制台
      20201220130810
    2. 进入云函数
      2
    3. 点击左边的层
      3
    4. 点新建,名称随意
      4
    5. 然后点击上传zip,选择dependency.zip上传
      20201220132539
    6. 选择运行环境python3.6
      20201220132701
    7. 然后点击确定,耐心等待一下,上传依赖包需要花费的时间比较长, 完成后界面如下
      20201220133327
    8. 点左边的函数服务,新建云函数
      20201220133453
      名称随意,运行环境选择python3.6,创建方式选择空白函数,然后点击下一步
      20201220133705
  3. 提交方法选择在线编辑,把附件里面的index.py直接全文复制粘贴到云函数的index.py
    20201220134004
  4. 然后点击文件->新建,文件名命名为config.yml,然后把下面附件里面config.yml的内容直接全文复制粘贴到云函数的config.yml文件(别的学校需要自己用generate.py配置好),点击下面的高级设置,设置超时时间为60秒,添加层为刚刚新建的函数依赖层,然后点击完成
  5. 进入新建好的云函数,左边点击触发管理,点击创建触发器,名称随意,触发周期选择自定义,然后配置cron表达式,下面的表达式表示每天晚上20:20执行
    0 20 20 * * * *
    

附件

dependcy.zip

链接

config.yml

#登陆相关配置
login:
  #开放的模拟登陆api服务器地址
  api: "http://www.zimo.wiki:8080/wisedu-unified-login-api-v1.0/api/login"
#用户组配置
users:
  #单个用户配置
  - user:
      #username 学号或者工号
      username: \'1111111111\'
      #password 密码
      password: \'111111111\'
      #address 地址,定位信息
      address: 中国四川省成都市金牛区一环路北1段-129号-附9号
      #email 接受通知消息的邮箱
      email: 1111111@qq.com
      #school 学校全称
      school: 合肥工业大学
  #多用户配置,将下面的注释去掉即可,如果有表单内容有图片,不建议使用多用户配置
#  - user:
#      #username 学号或者工号
#      username: 161105024
#      #password 密码
#      password: 161105024
#      #address 地址
#      address: 中国四川省成都市金牛区一环路北1段-129号-附9号
#      #email 接受通知消息的邮箱
#      email: 461009747@qq.com
#      school: 宜宾学院
#今日校园相关配置
cpdaily:
  #表单组默认选项配置
  defaults:
    #表单默认选项配置,按顺序,注意,只有标必填项的才处理
    - default:
        title: 今日在校情况
        type: 2
        value: 在校,校内住宿
    - default:
        title: 今日所在地
        type: 1
        value: 安徽省/合肥市/蜀山区
    - default:
        title: 今日晨检体温
        type: 2
        value: 正常,不发热
    - default:
        title: 今日午检体温
        type: 2
        value: 正常,不发热
    - default:
        title: 今日晚检体温
        type: 2
        value: 正常,不发热
    - default:
        title: 今日是否有咳嗽、呼吸困难、腹泻症状
        type: 2
        value: 否,无上述症状
    - default:
        title: 今日是否被医学隔离观察
        type: 2
        value: 否,未隔离
    - default:
        title: 简述今日主要行程
        type: 1
        value: 校内
    - default:
        title: 今日晚间返回宿舍(家)时间
        type: 1
        value: 2020-12-19/22:00

Info:
  ServerChan: # 填写Server酱的SCKEY
  Qsmg: # 填写Qsmg酱的SCKEY
  Email:
    enable: true
    server: smtp.exmail.qq.com # 填写邮件的smtp服务器
    port: 465 # 填写邮件服务器的端口号
    account: \'\' # 邮件服务器登录用户名
    password: \'\' # 邮件服务器登录密码

generate.py

  1. 功能:生成config.yml,今日校园问题的配置
  2. 如学校为hfut,不用该步骤
  3. 如其他学校,需要输入自己学校今日校园用户名和密码信息
# -*- coding: utf-8 -*-
import index as app
import yaml

# 生成默认配置
def generate():
    config = app.config
    user = config[\'users\'][0]
    apis = app.getCpdailyApis(user)
    session = app.getSession(user, apis[\'login-url\'])
    form = dict(app.queryForm(session, apis))[\'form\']
    # app.log(form)
    defaults = []
    sort = 1
    for formItem in form:
        if formItem[\'isRequired\'] == 1:
            default = {}
            one = {}
            default[\'title\'] = formItem[\'title\']
            default[\'type\'] = formItem[\'fieldType\']
            print(\'问题%d:\' % sort + default[\'title\'])
            if default[\'type\'] == 1 or default[\'type\'] == 5:
                default[\'value\'] = input("请输入文本:")
            if default[\'type\'] == 2:
                fieldItems = formItem[\'fieldItems\']
                num = 1
                for fieldItem in fieldItems:
                    print(\'\t%d \' % num + fieldItem[\'content\'])
                    num += 1
                choose = int(input("请输入序号:"))
                if choose < 1 or choose > num:
                    print(\'输入错误,请重新执行此脚本\')
                    exit(-1)
                default[\'value\'] = fieldItems[choose - 1][\'content\']
            if default[\'type\'] == 3:
                fieldItems = formItem[\'fieldItems\']
                num = 1
                for fieldItem in fieldItems:
                    print(\'\t%d \' % num + fieldItem[\'content\'])
                    num += 1
                chooses = list(map(int, input(\'请输入序号(可输入多个,请用空格隔开):\').split()))
                default[\'value\'] = \'\'
                for i in range(0, len(chooses)):
                    choose = chooses[i]
                    if choose < 1 or choose > num:
                        print(\'输入错误,请重新执行此脚本\')
                        exit(-1)
                    if i != len(chooses) - 1:
                        default[\'value\'] += fieldItems[choose - 1][\'content\'] + \',\'
                    else:
                        default[\'value\'] += fieldItems[choose - 1][\'content\']
            if default[\'type\'] == 4:
                default[\'value\'] = input("请输入图片名称:")
            one[\'default\'] = default
            defaults.append(one)
            sort += 1
    print(\'======================分隔线======================\')
    print(yaml.dump(defaults, allow_unicode=True))

if __name__ == "__main__":
    generate()

index.py

  1. 功能:自动打卡,需要配置好config.yml
# -*- coding: utf-8 -*-
import sys
import requests
import json
import yaml
import oss2
from urllib.parse import urlparse
from datetime import datetime, timedelta, timezone
from urllib3.exceptions import InsecureRequestWarning
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr

# debug模式
debug = False
if debug:
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


# 读取yml配置
def getYmlConfig(yaml_file=\'config.yml\'):
    file = open(yaml_file, \'r\', encoding="utf-8")
    file_data = file.read()
    file.close()
    config = yaml.load(file_data, Loader=yaml.FullLoader)
    return dict(config)


# 全局配置
config = getYmlConfig(yaml_file=\'config.yml\')


# 获取今日校园api
def getCpdailyApis(user):
    apis = {}
    user = user[\'user\']
    schools = requests.get(
        url=\'https://mobile.campushoy.com/v6/config/guest/tenant/list\', verify=not debug).json()[\'data\']
    flag = True
    for one in schools:
        if one[\'name\'] == user[\'school\']:
            if one[\'joinType\'] == \'NONE\':
                log(user[\'school\'] + \' 未加入今日校园\')
                exit(-1)
            flag = False
            params = {
                \'ids\': one[\'id\']
            }
            res = requests.get(url=\'https://mobile.campushoy.com/v6/config/guest/tenant/info\', params=params,
                               verify=not debug)
            data = res.json()[\'data\'][0]
            joinType = data[\'joinType\']
            idsUrl = data[\'idsUrl\']
            ampUrl = data[\'ampUrl\']
            if \'campusphere\' in ampUrl or \'cpdaily\' in ampUrl:
                parse = urlparse(ampUrl)
                host = parse.netloc
                res = requests.get(parse.scheme + \'://\' + host)
                parse = urlparse(res.url)
                apis[
                    \'login-url\'] = idsUrl + \'/login?service=\' + parse.scheme + r"%3A%2F%2F" + host + r\'%2Fportal%2Flogin\'
                apis[\'host\'] = host

            ampUrl2 = data[\'ampUrl2\']
            if \'campusphere\' in ampUrl2 or \'cpdaily\' in ampUrl2:
                parse = urlparse(ampUrl2)
                host = parse.netloc
                res = requests.get(parse.scheme + \'://\' + host)
                parse = urlparse(res.url)
                apis[
                    \'login-url\'] = idsUrl + \'/login?service=\' + parse.scheme + r"%3A%2F%2F" + host + r\'%2Fportal%2Flogin\'
                apis[\'host\'] = host
            break
    if flag:
        log(user[\'school\'] + \' 未找到该院校信息,请检查是否是学校全称错误\')
        exit(-1)
    log(apis)
    return apis


# 获取当前utc时间,并格式化为北京时间
def getTimeStr():
    utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc)
    bj_dt = utc_dt.astimezone(timezone(timedelta(hours=8)))
    return bj_dt.strftime("%Y-%m-%d %H:%M:%S")


# 输出调试信息,并及时刷新缓冲区
def log(content):
    print(getTimeStr() + \' \' + str(content))
    sys.stdout.flush()


# 登陆并返回session
def getSession(user, loginUrl):
    user = user[\'user\']
    params = {
        \'login_url\': loginUrl,
        # 保证学工号和密码正确下面两项就不需要配置
        \'needcaptcha_url\': \'\',
        \'captcha_url\': \'\',
        \'username\': user[\'username\'],
        \'password\': user[\'password\']
    }

    cookies = {}
    # 借助上一个项目开放出来的登陆API,模拟登陆
    res = requests.post(config[\'login\'][\'api\'], params, verify=not debug)
    cookieStr = str(res.json()[\'cookies\'])
    log(cookieStr)
    if cookieStr == \'None\':
        log(res.json())
        return None

    # 解析cookie
    for line in cookieStr.split(\';\'):
        name, value = line.strip().split(\'=\', 1)
        cookies[name] = value
    session = requests.session()
    session.cookies = requests.utils.cookiejar_from_dict(cookies)
    return session


# 查询表单
def queryForm(session, apis):
    host = apis[\'host\']
    headers = {
        \'Accept\': \'application/json, text/plain, */*\',
        \'User-Agent\': \'Mozilla/5.0 (Linux; Android 4.4.4; OPPO R11 Plus Build/KTU84P) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/33.0.0.0 Safari/537.36 yiban/8.1.11 cpdaily/8.1.11 wisedu/8.1.11\',
        \'content-type\': \'application/json\',
        \'Accept-Encoding\': \'gzip,deflate\',
        \'Accept-Language\': \'zh-CN,en-US;q=0.8\',
        \'Content-Type\': \'application/json;charset=UTF-8\'
    }
    queryCollectWidUrl = \'https://{host}/wec-counselor-collector-apps/stu/collector/queryCollectorProcessingList\'.format(
        host=host)
    params = {
        \'pageSize\': 6,
        \'pageNumber\': 1
    }
    res = session.post(queryCollectWidUrl, headers=headers,
                       data=json.dumps(params), verify=not debug)
    if len(res.json()[\'datas\'][\'rows\']) < 1:
        return None

    collectWid = res.json()[\'datas\'][\'rows\'][0][\'wid\']
    formWid = res.json()[\'datas\'][\'rows\'][0][\'formWid\']

    detailCollector = \'https://{host}/wec-counselor-collector-apps/stu/collector/detailCollector\'.format(
        host=host)
    res = session.post(url=detailCollector, headers=headers,
                       data=json.dumps({"collectorWid": collectWid}), verify=not debug)
    schoolTaskWid = res.json()[\'datas\'][\'collector\'][\'schoolTaskWid\']

    getFormFields = \'https://{host}/wec-counselor-collector-apps/stu/collector/getFormFields\'.format(
        host=host)
    res = session.post(url=getFormFields, headers=headers, data=json.dumps(
        {"pageSize": 100, "pageNumber": 1, "formWid": formWid, "collectorWid": collectWid}), verify=not debug)

    form = res.json()[\'datas\'][\'rows\']
    return {\'collectWid\': collectWid, \'formWid\': formWid, \'schoolTaskWid\': schoolTaskWid, \'form\': form}


# 填写form
def fillForm(session, form, host):
    sort = 1
    for formItem in form[:]:
        # 只处理必填项
        if formItem[\'isRequired\'] == 1:
            default = config[\'cpdaily\'][\'defaults\'][sort - 1][\'default\']
            if formItem[\'title\'] != default[\'title\']:
                log(\'第%d个默认配置不正确,请检查\' % sort)
                exit(-1)
            # 文本直接赋值
            if formItem[\'fieldType\'] == 1 or formItem[\'fieldType\'] == 5:
                formItem[\'value\'] = default[\'value\']
            # 单选框需要删掉多余的选项
            if formItem[\'fieldType\'] == 2:
                # 填充默认值
                formItem[\'value\'] = default[\'value\']
                fieldItems = formItem[\'fieldItems\']
                for i in range(0, len(fieldItems))[::-1]:
                    if fieldItems[i][\'content\'] != default[\'value\']:
                        del fieldItems[i]
            # 多选需要分割默认选项值,并且删掉无用的其他选项
            if formItem[\'fieldType\'] == 3:
                fieldItems = formItem[\'fieldItems\']
                defaultValues = default[\'value\'].split(\',\')
                for i in range(0, len(fieldItems))[::-1]:
                    flag = True
                    for j in range(0, len(defaultValues))[::-1]:
                        if fieldItems[i][\'content\'] == defaultValues[j]:
                            # 填充默认值
                            formItem[\'value\'] += defaultValues[j] + \' \'
                            flag = False
                    if flag:
                        del fieldItems[i]
            # 图片需要上传到阿里云oss
            if formItem[\'fieldType\'] == 4:
                fileName = uploadPicture(session, default[\'value\'], host)
                formItem[\'value\'] = getPictureUrl(session, fileName, host)
            log(\'必填问题%d:\' % sort + formItem[\'title\'])
            log(\'答案%d:\' % sort + formItem[\'value\'])
            sort += 1
        else:
            form.remove(formItem)
    # print(form)
    return form


# 上传图片到阿里云oss
def uploadPicture(session, image, host):
    url = \'https://{host}/wec-counselor-collector-apps/stu/collector/getStsAccess\'.format(
        host=host)
    res = session.post(url=url, headers={
                       \'content-type\': \'application/json\'}, data=json.dumps({}), verify=not debug)
    datas = res.json().get(\'datas\')
    fileName = datas.get(\'fileName\')
    accessKeyId = datas.get(\'accessKeyId\')
    accessSecret = datas.get(\'accessKeySecret\')
    securityToken = datas.get(\'securityToken\')
    endPoint = datas.get(\'endPoint\')
    bucket = datas.get(\'bucket\')
    bucket = oss2.Bucket(oss2.Auth(access_key_id=accessKeyId,
                                   access_key_secret=accessSecret), endPoint, bucket)
    with open(image, "rb") as f:
        data = f.read()
    bucket.put_object(key=fileName, headers={
                      \'x-oss-security-token\': securityToken}, data=data)
    res = bucket.sign_url(\'PUT\', fileName, 60)
    # log(res)
    return fileName


# 获取图片上传位置
def getPictureUrl(session, fileName, host):
    url = \'https://{host}/wec-counselor-collector-apps/stu/collector/previewAttachment\'.format(
        host=host)
    data = {
        \'ossKey\': fileName
    }
    res = session.post(url=url, headers={
                       \'content-type\': \'application/json\'}, data=json.dumps(data), verify=not debug)
    photoUrl = res.json().get(\'datas\')
    return photoUrl


# 提交表单
def submitForm(formWid, address, collectWid, schoolTaskWid, form, session, host):
    headers = {
        \'User-Agent\': \'Mozilla/5.0 (Linux; Android 4.4.4; OPPO R11 Plus Build/KTU84P) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/33.0.0.0 Safari/537.36 okhttp/3.12.4\',
        \'CpdailyStandAlone\': \'0\',
        \'extension\': \'1\',
        \'Cpdaily-Extension\': \'1wAXD2TvR72sQ8u+0Dw8Dr1Qo1jhbem8Nr+LOE6xdiqxKKuj5sXbDTrOWcaf v1X35UtZdUfxokyuIKD4mPPw5LwwsQXbVZ0Q+sXnuKEpPOtk2KDzQoQ89KVs gslxPICKmyfvEpl58eloAZSZpaLc3ifgciGw+PIdB6vOsm2H6KSbwD8FpjY3 3Tprn2s5jeHOp/3GcSdmiFLYwYXjBt7pwgd/ERR3HiBfCgGGTclquQz+tgjJ PdnDjA==\',
        \'Content-Type\': \'application/json; charset=utf-8\',
        # 请注意这个应该和配置文件中的host保持一致
        \'Host\': host,
        \'Connection\': \'Keep-Alive\',
        \'Accept-Encoding\': \'gzip\'
    }

    # 默认正常的提交参数json
    params = {"formWid": formWid, "address": address, "collectWid": collectWid, "schoolTaskWid": schoolTaskWid,
              "form": form}
    # print(params)
    submitForm = \'https://{host}/wec-counselor-collector-apps/stu/collector/submitForm\'.format(
        host=host)
    r = session.post(url=submitForm, headers=headers,
                     data=json.dumps(params), verify=not debug)
    msg = r.json()[\'message\']
    return msg

title_text = \'今日校园疫结果通知\'

# 发送邮件通知
def sendMessage(send, msg):
    if send != \'\':
        log(\'正在发送邮件通知。。。\')
        res = requests.post(url=\'http://www.zimo.wiki:8080/mail-sender/sendMail\',
                            data={\'title\': title_text, \'content\': getTimeStr() + str(msg), \'to\': send})

        code = res.json()[\'code\']
        if code == 0:
            log(\'发送邮件通知成功。。。\')
        else:
            log(\'发送邮件通知失败。。。\')
            log(res.json())

def sendEmail(send,msg):
    my_sender= config[\'Info\'][\'Email\'][\'account\']   # 发件人邮箱账号
    my_pass = config[\'Info\'][\'Email\'][\'password\']         # 发件人邮箱密码
    my_user = send      # 收件人邮箱账号,我这边发送给自己
    try:
        msg=MIMEText(getTimeStr() + str(msg),\'plain\',\'utf-8\')
        msg[\'From\']=formataddr(["FromRunoob",my_sender])  # 括号里的对应发件人邮箱昵称、发件人邮箱账号
        msg[\'To\']=formataddr(["FK",my_user])              # 括号里的对应收件人邮箱昵称、收件人邮箱账号
        msg[\'Subject\']=title_text               # 邮件的主题,也可以说是标题

        server=smtplib.SMTP_SSL(config[\'Info\'][\'Email\'][\'server\'], config[\'Info\'][\'Email\'][\'port\'])  # 发件人邮箱中的SMTP服务器,端口是25
        server.login(my_sender, my_pass)  # 括号中对应的是发件人邮箱账号、邮箱密码
        server.sendmail(my_sender,[my_user,],msg.as_string())  # 括号中对应的是发件人邮箱账号、收件人邮箱账号、发送邮件
        server.quit()  # 关闭连接
    except Exception:  # 如果 try 中的语句没有执行,则会执行下面的 ret=False
        log("邮件发送失败")
    else: print("邮件发送成功")

# server酱通知
def sendServerChan(msg):
    log(\'正在发送Server酱。。。\')
    res = requests.post(url=\'https://sc.ftqq.com/{0}.send\'.format(config[\'Info\'][\'ServerChan\']),
                            data={\'text\': title_text, \'desp\': getTimeStr() + "\n" + str(msg)})
    code = res.json()[\'errmsg\']
    if code == \'success\':
        log(\'发送Server酱通知成功。。。\')
    else:
        log(\'发送Server酱通知失败。。。\')
        log(\'Server酱返回结果\'+code)

# Qmsg酱通知
def sendQmsgChan(msg):
    log(\'正在发送Qmsg酱。。。\')
    res = requests.post(url=\'https://qmsg.zendee.cn:443/send/{0}\'.format(config[\'Info\'][\'Qsmg\']),
                            data={\'msg\': title_text + \'\n时间:\' + getTimeStr() + "\n 返回结果:" + str(msg)})
    code = res.json()[\'success\']
    if code:
        log(\'发送Qmsg酱通知成功。。。\')
    else:
        log(\'发送Qmsg酱通知失败。。。\')
        log(\'Qmsg酱返回结果\'+code)

# 综合提交
def InfoSubmit(msg, send=None):
    if(None != send):
        if(config[\'Info\'][\'Email\'][\'enable\']): sendEmail(send,msg)
        else: sendMessage(send, msg)
    if(config[\'Info\'][\'ServerChan\']): sendServerChan(msg)
    if(config[\'Info\'][\'Qsmg\']): sendQmsgChan(msg)


def main_handler(event, context):
    try:
        for user in config[\'users\']:
            log(\'当前用户:\' + str(user[\'user\'][\'username\']))
            apis = getCpdailyApis(user)
            log(\'脚本开始执行。。。\')
            log(\'开始模拟登陆。。。\')
            session = getSession(user, apis[\'login-url\'])
            if session != None:
                log(\'模拟登陆成功。。。\')
                log(\'正在查询最新待填写问卷。。。\')
                params = queryForm(session, apis)
                if str(params) == \'None\':
                    log(\'获取最新待填写问卷失败,可能是辅导员还没有发布。。。\')
                    InfoSubmit(\'没有新问卷\')
                    exit(-1)
                log(\'查询最新待填写问卷成功。。。\')
                log(\'正在自动填写问卷。。。\')
                form = fillForm(session, params[\'form\'], apis[\'host\'])
                log(\'填写问卷成功。。。\')
                log(\'正在自动提交。。。\')
                msg = submitForm(params[\'formWid\'], user[\'user\'][\'address\'], params[\'collectWid\'],
                                 params[\'schoolTaskWid\'], form, session, apis[\'host\'])
                if msg == \'SUCCESS\':
                    log(\'自动提交成功!\')
                    InfoSubmit(\'自动提交成功!\', user[\'user\'][\'email\'])
                elif msg == \'该收集已填写无需再次填写\':
                    log(\'今日已提交!\')
                    # InfoSubmit(\'今日已提交!\', user[\'user\'][\'email\'])
                    InfoSubmit(\'今日已提交!\')
                else:
                    log(\'自动提交失败。。。\')
                    log(\'错误是\' + msg)
                    InfoSubmit(\'自动提交失败!错误是\' + msg, user[\'user\'][\'email\'])
                    exit(-1)
            else:
                log(\'模拟登陆失败。。。\')
                log(\'原因可能是学号或密码错误,请检查配置后,重启脚本。。。\')
                exit(-1)
    except Exception as e:
        InfoSubmit("出现问题了!"+str(e))
        raise e
    else:
        return \'success\'


# 配合Windows计划任务等使用
if __name__ == \'__main__\':
    print(main_handler({}, {}))
    # for user in config[\'users\']:
    #     log(getCpdailyApis(user))

版权声明:本文为xiaozhiblogs原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/xiaozhiblogs/p/14167557.html