使用 VPN 混淆网络流量时,AWS CLI 脚本更改 EC2 的安全组以匹配当前分配的 IP 地址?

AWS CLI script to change security group of EC2 to match currently assigned IP address when using VPN to obfuscate network traffic?

如果我使用的是动态更改我的 IP 地址的 VPN 服务,有没有办法编写一个脚本,我可以 运行 使用 AWS CLI 来更新 IP 地址(由那个特定时刻的 VPN)在我的 AWS VPC 安全组上,它允许通过 TCP/Port 22?

SSH 访问 EC2 实例

此外,当 VPN 提供商更改我的 IP 地址时,我如何自动禁用它?

Tl;DR 您可以将规则的源配置为 CIDR 块,与您的 VPN 地址相匹配 space.

安全组充当您实例的虚拟防火墙,以控制入站和出站流量。对于每个安全组,您添加控制实例入站流量的规则,以及一组单独的控制出站流量的规则。

这里我们对入站规则感兴趣。每条规则都包含流量的来源和目标端口或端口范围。源可以是另一个安全组、IPv4 或 IPv6 CIDR 块、单个 IPv4 或 IPv6 地址或前缀列表 ID。

当您将 CIDR 块指定为规则的源时,将允许来自指定地址的指定协议和端口的流量。如果指定单个 IPv4 地址,请使用 /32 前缀长度指定地址。例如 127.0.0.1/32 将只允许来自该特定 IP 的访问。但是,如果我们将 CIDR 块定义为 127.0.0.1/8,则所有以 127. 开头的 IP 地址都将被允许。例如,127.123.123.123 将被允许。

因此,您可以将规则的来源配置为 CIDR 块,它与您的 VPN 地址 space。

最好的,斯特凡

这是我用来将当前 IP 地址添加到安全组的 shell 脚本:

IP=`curl -s http://whatismyip.akamai.com/`
aws ec2 authorize-security-group-ingress --group-name "VPN-SSH-SG" --protocol tcp --port 22   --cidr $IP/32 --output text

您需要为您的安全组名称更新它。

这是一个 Python 脚本,用于自动删除安全组中的所有规则:

#!/usr/bin/env python

import boto3

GROUP_NAME = "VPN-SSH-SG"

# Connect to the Amazon EC2 service
ec2 = boto3.resource('ec2')

# Retrieve the security group
security_groups = ec2.security_groups.filter(Filters=[{'Name':'group-name', 'Values':[GROUP_NAME]}])

# Delete all rules in the group
for group in security_groups:
    group.revoke_ingress(IpPermissions = group.ip_permissions)

您可以将它们组合在一起以清除现有条目,然后添加您当前的 IP 地址。

根据上面 John Rotenstein 的回答,我对其进行了修改并将其写入 Lambda 函数。现在效果很好!希望这对其他人有帮助。

import boto3
import urllib3

# %% Description
'''Update the IP address to connect with a VPN that
dynamically changes user IPs.'''
    
def lambda_handler(event, context):
    
    # %% Get current IP in CIDR notation
    http = urllib3.PoolManager()
    r = http.request('GET', 'http://checkip.amazonaws.com/')
    ip = r.data.decode("utf-8").strip('\n')
    CidrIp_new = ip + '/32'
    
    # %% Revoke old permissions
    sg = boto3.resource('ec2').SecurityGroup('sg-ce95e5d5')
    permissions = sg.ip_permissions
    FromPort = permissions[0].get('FromPort')
    IpProtocol = permissions[0].get('IpProtocol')
    CidrIp = permissions[0].get('IpRanges')[0].get('CidrIp')
    ToPort = permissions[0].get('ToPort')
    revoke_resp = sg.revoke_ingress(
        FromPort=FromPort, IpProtocol=IpProtocol,
        CidrIp=CidrIp, ToPort=ToPort
        )
    
    # %% Authorize new sg parameters
    auth_resp = sg.authorize_ingress(
        FromPort=FromPort, IpProtocol=IpProtocol,
        CidrIp=CidrIp_new, ToPort=ToPort
        )
    
    return 'Revoke Status', revoke_resp, 'Authorize Status', auth_resp

如果您有许多环境和许多安全组,这里是 python 脚本,用于在您的家庭 ip 更改后全部更新:

import boto3
import urllib3
import sys
from datetime import datetime

CURRENT_IP = "4x.1xx.2xx.2xx/32"  # Fixed old ip

GROUP_MAP = {
    "profile1": [
        {
            "GroupId": ["sg-xxx"],
            "Description": "Description",
            "FromPort": 443
            # "ToPort": optional to port
        }
    ],
    "profile2": [
        {
            "GroupId": ["sg-xxx"],
            "Description": "proxy",
            "FromPort": 22
        }
    ]
}


def get_new_ip():
    http = urllib3.PoolManager()
    r = http.request('GET', 'http://checkip.amazonaws.com/')
    ip = r.data.decode("utf-8").strip('\n')
    return ip + '/32'


def update_for_profile(data, old_ip, new_ip, profile_name):
    print(f"{profile_name}: Updating for {data.get('Description', '')} {data['GroupId']}")
    ec2 = boto3.resource('ec2')
    groups = ec2.security_groups.filter(Filters=[{'Name': 'group-id', 'Values': data['GroupId']}])
    from_port = data["FromPort"]
    to_port = data.get('ToPort', from_port)
    ip_protocol = data.get('ip_protocol', 'tcp')
    cidr_ip = old_ip
    description = f"Added {datetime.today().strftime('%Y-%m-%d')}"
    try:
        for group in groups:
            group.revoke_ingress(
                FromPort=from_port,
                IpProtocol=ip_protocol,
                CidrIp=cidr_ip,
                ToPort=to_port
            )
            group.authorize_ingress(
                IpPermissions=[dict(
                    FromPort=from_port,
                    IpProtocol=ip_protocol,
                    ToPort=to_port,
                    IpRanges=[{
                        'CidrIp': new_ip,
                        'Description': description
                    }])
                ]
            )
    except Exception as exc:
        print(exc)


def update_my_ip(old_ip, new_ip):
    for profile_name, mapping in GROUP_MAP.items():
        boto3.setup_default_session(profile_name=profile_name)
        for item in mapping:
            update_for_profile(item, old_ip, new_ip, profile_name)


if __name__ == "__main__":
    new_ip = get_new_ip()
    old_ip = sys.argv[1] if len(sys.argv) > 1 else CURRENT_IP
    if not old_ip:
        print("There's no old ip")
    print(f"Updating my ip from {old_ip} to {new_ip}")
    update_my_ip(old_ip, new_ip)

感谢您上面的回答