原创内容,转载请注明来源https://songyue.wang
最近在环境中设置了JumpServer开源堡垒机,来监管开发人员对特权身份(Privileged Identity)的使用。我们计划用JumpServer内置的command filter功能,根据用户角色对特权会话中执行的命令进行限制,例如阻止所有用户使用su/passwd
等命令绕过PAM改密码或者切换账户,限制用户使用shutdown/reboot/poweroff
电源命令等。然而,JumpServer的command filter只能拦截并记录用户行为,无法根据用户行为感知内部人员威胁或PAM密码泄漏等安全事故。因此,我们使用QRadar SIEM进行简单的事件归集,并在QRadar上部署Python脚本自动化响应特权身份管理(PIM)的安全事件,例如中断会话或禁用PAM登录。
添加日志来源
先关闭JumpServer服务器,在config.txt
中加入syslog目的地,然后重新启动JumpServer:
./jmsctl.sh down nano /opt/jumpserver/config/config.txt
SYSLOG_ENABLE=true SYSLOG_ADDR=192.168.32.239:514 SYSLOG_FACILITY=local2
./jmsctl.sh start
在QRadar为JumpServer建立日志来源,并部署变更。QRadar添加自定义日志源的方法可参考这篇博客。
需要特别注意的是,JumpServer的日志payload通常较大(包含远程服务器的信息或命令的具体内容),而JumpServer发送syslog默认使用UDP传输,可能导致QRadar接收时被截断。解决方法有改用TCP发送日志或调大QRadar对UDP payload长度限制。我们使用第二种,Admin => System Settings => Switch to Advance => System Settings =>Max UDP Syslog Payload Length,从1024改成2048即可,完成后部署变更。
QRadar没有预置JumpServer的DSM,需要手动建立payload剖析和QID事件对映,至少要添加一个远程命令拒绝事件。
对session_command_log
的剖析规则如下:
<150>jumpserver: session_command_log - {"account": "jumpserver_pim(jumpserver_pim)", "asset": "JumpServer-PAM(192.168.32.3)", "id": "***", "input": "su", "org_id": "00000000-0000-0000-0000-000000000002", "output": "\u001b[31mCommand `su` is forbidden\n\r\u001b[0m", "remote_addr": "192.168.32.10", "risk_level": {"label": "Reject", "value": 5}, "session": "***", "timestamp": 1743099818, "timestamp_display": "2025/03/28 02:23:38 +0800", "user": "Songyue Wang(aaronwang)"}
事件属性 | 表达式类型 | 表达式 |
Event Category | Regex | <150>jumpserver:\s*(\S+) |
Event ID | JSON | /"risk_level"/"label" |
Destination IP | Regex | "asset"\s:\s"[^"]*((\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}))" |
Source IP | JSON | /"remote_addr" |
Username | JSON | /"user" |
由于不同事件种类的日志格式不同,如果想涵盖所有事件,需要为每个事件属性添加更多的剖析规则,不在这里展开。JumpServer的事件种类及对应的日志payload样例请参照官方文档。
为违规远程命令新建QID记录,高层次及低层次种类可选择可疑 => 使用者行为。
打开一个JumpServer会话,输入一些被禁止的命令测试,确定QRadar可以接收到事件并正确剖析。
自动响应违规操作
设置JumpServer API
新建一个JumpServer管理员SIEM Automation,然后访问JumpServer的Docker容器为其新建永久API key:
docker exec -it jms_core /bin/bash cd /opt/jumpserver/apps python manage.py shell from users.models import User u = User.objects.get(username='siem-automation') u.create_private_token()
访问http://[JumpServer_IP]/api/docs即可查看JumpServer的API文档,点Authorize输入密钥之后可在线测试。
账户封锁脚本
起初我们计划在用户多次尝试违规指令之后,将其账户设为inactive状态。但测试之后发现,虽然用户会立即失去访问JumpServer及开启新会话的权限,但用户仍然可以在会话最长连接时间(max session time)到期前继续在已连接的SFTP/SSH/RDP中进行操作。因此,还需要用/terminal/tasks/kill-session/
这个API主动终止所有活动中的会话。脚本的最终逻辑为:
- QRadar自动传入用户名变量,格式为FULL_NAME(username),例如Songyue Wang(aaronwang)
- 获得用户uuid
- 获得用户当前会话session uuid
- 逐个终止当前会话
- 将用户账户设置为inactive
import requests import json import sys jumpserver_ip = "JUMPSERVER_IP" admin_api_key = "JUMPSERVER_API_KEY" request_headers = { 'Authorization': f'Token {admin_api_key}', 'Content-Type': 'application/json', } def main(): username = sys.argv[1] username = username.split("(")[1].split(")")[0] user_id = get_user_id(username) session_ids = get_current_sessions(user_id) terminate_session(session_ids) disable_user(user_id) def get_user_id(username): url = f"http://{jumpserver_ip}/api/v1/users/users/?username={username}" payload = {} response = requests.request("GET", url, headers=request_headers, data=payload) response = json.loads(response.text)[0] return response["id"] def get_current_sessions(user_id): url = f"http://{jumpserver_ip}/api/v1/terminal/sessions/?user_id={user_id}&is_finished=false" session_ids = [] response = requests.request("GET", url, headers=request_headers) response = json.loads(response.text) for i in response: session_ids.append(i["id"]) return session_ids def terminate_session(session_ids): url = f"http://{jumpserver_ip}/api/v1/terminal/tasks/kill-session/" payload = {} for i in session_ids: payload[i] = "" response = requests.request("POST", url, headers=request_headers, data=json.dumps(payload)) response = json.loads(response.text) return len(response["ok"]) == len(session_ids) def disable_user(user_id): url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/" payload = {"is_active": False} response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload)) if __name__ == "__main__": main()
登录QRadar SIEM管理端,Admin => Custom Actions => Define Actions => Add上传刚刚的Python脚本,并且设置需要QRadar自动传入的变量(事件中的用户名)。完成后需要部署变更。
可以先将传入变量设置为固定值,在JumpServer里打开一个会话,执行一下测试。可以看到QRadar能成功调用到JumpServer的API(JumpServer会话页面显示Terminated by admin SIEM Automation并且提示重新登录)。
定义攻击规则
Offenses => Rules => Actions =>New Event Rule新建攻击规则。我们的定义是24小时内,同一JumpServer用户在整个PAM系统中一共触发了三次被拒绝的远程命令。
在Rule Response里勾选Execute Custom Action,并选择之前上传的Python脚本。
到这里就可以进行测试了。同一个PAM用户在24小时之内三次尝试违规命令,在第三次命令被拒绝后的十秒左右,用户JumpServer的工作台就会显示所有连接断开,并退出登录。再次尝试登录时,登录页面提示用户名或密码错误(JumpServer不会提示用户被禁用)。
打开QRadar的攻击页面,显示一项Rejected PAM Command攻击,在此可查看相关的事件详情,例如各命令的内容,目标服务器,及尝试执行的时间。
按照QRadar默认设置,如果同一个JumpServer的用户在攻击规则触发的五天内又有三次违规操作,QRadar不会显示新的攻击并执行脚本,而是将新的事件归到已有攻击。因此在恢复用户JumpServer账户前要及时关闭完成调查的攻击,这样封锁脚本才会继续生效。
恢复账户
在完成事件响应并排除隐患后,需要恢复用户对PAM的访问:
- 向用户发送强制重设密码邮件
- 在用户下次登录时强制重设MFA
- 将账户状态重新设为active
设置SMTP服务
System Settings => Notifications填入邮件服务提供商的SMTP服务器地址,邮箱账户及密码。如果JumpServer部署在防火墙后面,记得在防火墙策略放通587(SMTP)或465(SMTPS)端口。
测试服务器可以正常发出邮件。
账户恢复脚本
管理员在JumpServer控制台进行上述操作,或者执行以下脚本。待用户收到邮件,完成密码和MFA充值后,即可正常访问PAM系统。
import requests import json import sys jumpserver_ip = "JUMPSERVER_IP" admin_api_key = "JUMPSERVER_API_KEY" request_headers = { 'Authorization': f'Token {admin_api_key}', 'Content-Type': 'application/json', } def main(): username = input("Username: ") user_id = get_user_id(username) if user_id: try: send_password_reset(user_id) send_mfa_reset(user_id) enable_user(user_id) print(f"User {username} has been successfully recovered.") except Exception as e: print(f"An error occurred while recovering the user: {e}") else: print("User ID not found") def get_user_id(username): url = f"http://{jumpserver_ip}/api/v1/users/users/?username={username}" payload = {} response = requests.request("GET", url, headers=request_headers, data=json.dumps(payload)) response = json.loads(response.text) if len(response) == 0: return False else: return response[0]["id"] def send_password_reset(user_id): url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/password/reset/" payload = {} response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload)) response = json.loads(response.text) def send_mfa_reset(user_id): url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/mfa/reset/" payload = {} response = requests.request("GET", url, headers=request_headers, data=json.dumps(payload)) response = json.loads(response.text) def enable_user(user_id): url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/" payload = {"is_active": True} response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload)) response = json.loads(response.text) if __name__ == "__main__": main()