使用云函数更新TEO节点IP到安全组

TEO 的节点 IP 较多,手动更新不现实;TEO 提供的“源站保护”功能仅适用于部分套餐,其原理大概是只调度域名到固定的节点去回源,如果节点更新后,需要用户确认,才会调度到新的节点,不够灵活。

通过使用云函数,可以实现自动拉取 TEO 的所有节点 IP 并更新到安全组中,自行管理更新的节奏。

Step1. 创建子用户

在这里,我们依赖子用户去调用腾讯云 API,用于更新安全组,请创建子用户并赋予安全组更新权限,保存 SecretID / SecretKey 备用。

图 1.1 创建子用户
图1.2 授予用户权限
图1.3 新建权限策略

Step2. 创建云函数

使用“从头开始”创建云函数,选择 Python3.10

代码使用如下内容

代码语言:python
代码运行次数:0
复制
Cloud Studio 代码运行
# -*- coding: utf8 -*-

import os
import sys
from typing import List, Tuple, Literal
import json
from tencentcloud.common import credential
from tencentcloud.vpc.v20170312 import vpc_client, models
import httpx
import logging

logger = logging.getLogger()

SG_REGION = "" # 替换为你的安全组所在地域,如 ap-beijing 表示北京
IPV4_SG_ID = "" # 替换为你的IPV4安全组ID
IPV6_SG_ID = "" # 替换为你的IPV6安全组ID
MAX_IPS = 400 # 请提前申请配额到 400,否则一个安全组装不下
AKID = os.environ.get('APP_TENCENTCLOUD_SECRETID', "")
AKSK = os.environ.get('APP_TENCENTCLOUD_SECRETKEY', "")

class SGReplace:
def run(self) -> None:
ipv4_ips, ipv6_ips = self.get_eo_ips()
if len(ipv4_ips) > MAX_IPS or len(ipv6_ips) > MAX_IPS:
raise AssertionError("Too many IP addresses")
client = vpc_client.VpcClient(credential.Credential(AKID, AKSK), SG_REGION)

    # update ipv4 sg
    ipv4_ips.sort()
    ipv4_params = self.build_params(IPV4_SG_ID, ipv4_ips, "ipv4")
    try:
        ipv4_req = models.ModifySecurityGroupPoliciesRequest()
        ipv4_req.from_json_string(json.dumps(ipv4_params))
        ipv4_resp = client.ModifySecurityGroupPolicies(ipv4_req)
        logger.info("IPV4 Response: %s", ipv4_resp.to_json_string())
    except Exception as err:
        logger.exception("Call API Failed: %s", err)

    # update ipv6 sg
    ipv6_ips.sort()
    ipv6_params = self.build_params(IPV6_SG_ID, ipv6_ips, "ipv6")
    try:
        ipv6_req = models.ModifySecurityGroupPoliciesRequest()
        ipv6_req.from_json_string(json.dumps(ipv6_params))
        ipv6_resp = client.ModifySecurityGroupPolicies(ipv6_req)
        logger.info("IPV6 Response: %s", ipv6_resp.to_json_string())
    except Exception as err:
        logger.exception("Call API Failed: %s", err)

def build_params(self, sg_id: str, ips: List[str], ip_type: Literal["ipv4", "ipv6"]) -> dict:
    block = "CidrBlock" if ip_type == "ipv4" else "Ipv6CidrBlock"
    return {
        "SecurityGroupId": sg_id,
        "SecurityGroupPolicySet": {
            "Ingress": [
                {
                    "Action": "ACCEPT",
                    "Port": "443",
                    "Protocol": "TCP",
                    block: ip,
                }
                for ip in ips if ip
            ],
            "Egress": [
                {
                    "Action": "accept",
                    "Protocol": "ALL",
                    "Port": "ALL",
                    "CidrBlock": "0.0.0.0/0"
                },
                {
                    "Action": "accept",
                    "Protocol": "ALL",
                    "Port": "ALL",
                    "Ipv6CidrBlock": "::/0"
                }
            ]
        }
    }

def get_eo_ips(self) -> Tuple[List[str], List[str]]:
    client = httpx.Client()
    try:
        response = client.get("https://api.edgeone.ai/ips")
        response.raise_for_status()
    except Exception as err:
        logger.exception("Load TEO IPs Failed: %s", err)
        return [], []
    finally:
        client.close()
    ips = (ip for ip in response.content.decode().split("\n"))
    ipv4_ips = set()
    ipv6_ips = set()
    for ip in ips:
        if not ip:
            continue
        if ":" in ip:
            ipv6_ips.add(ip)
            continue
        ipv4_ips.add(ip)
    del ips
    logger.info("Load TEO IPs Success: %d %d", len(ipv4_ips), len(ipv6_ips))
    return list(ipv4_ips), list(ipv6_ips)

def main_handler(*args, **kwargs):
logger.info("TEONodeIPSG Start")
SGReplace().run()
logger.info("TEONodeIPSG ENd")

Step3. 配置环境变量

在环境变量中,需要配置下面的两个变量

代码语言:txt
复制
APP_TENCENTCLOUD_SECRETID=<刚创建的用户的 Secret ID>
APP_TENCENTCLOUD_SECRETKEY=<刚创建的用户的 Secret Key>

Step4. 配置触发器

触发器决定了多久运行一次,建议配置为 30min 或者 1h 运行一次