原创内容,转载请注明来源https://songyue.wang
最近在环境中设置了JumpServer开源堡垒机,来监管开发人员对特权身份(Privileged Identity)的使用。我们计划用JumpServer内置的command filter功能,根据用户角色对特权会话中执行的命令进行限制,例如阻止所有用户使用su/passwd等命令绕过PAM改密码或者切换账户,限制用户使用shutdown/reboot/poweroff电源命令等。然而,JumpServer的command filter只能拦截并记录用户行为,无法根据用户行为感知内部人员威胁或PAM密码泄漏等安全事故。因此,我们使用QRadar SIEM进行简单的事件归集,并在QRadar上部署Python脚本自动化响应特权身份管理(PIM)的安全事件,例如中断会话或禁用PAM登录。
添加日志来源
先关闭JumpServer服务器,在config.txt中加入syslog目的地,然后重新启动JumpServer:
./jmsctl.sh down nano /opt/jumpserver/config/config.txt
SYSLOG_ENABLE=true SYSLOG_ADDR=192.168.32.239:514 SYSLOG_FACILITY=local2
./jmsctl.sh start
在QRadar为JumpServer建立日志来源,并部署变更。QRadar添加自定义日志源的方法可参考这篇博客。

需要特别注意的是,JumpServer的日志payload通常较大(包含远程服务器的信息或命令的具体内容),而JumpServer发送syslog默认使用UDP传输,可能导致QRadar接收时被截断。解决方法有改用TCP发送日志或调大QRadar对UDP payload长度限制。我们使用第二种,Admin => System Settings => Switch to Advance => System Settings =>Max UDP Syslog Payload Length,从1024改成2048即可,完成后部署变更。

QRadar没有预置JumpServer的DSM,需要手动建立payload剖析和QID事件对映,至少要添加一个远程命令拒绝事件。

对session_command_log的剖析规则如下:
<150>jumpserver: session_command_log - {"account": "jumpserver_pim(jumpserver_pim)", "asset": "JumpServer-PAM(192.168.32.3)", "id": "***", "input": "su", "org_id": "00000000-0000-0000-0000-000000000002", "output": "\u001b[31mCommand `su` is forbidden\n\r\u001b[0m", "remote_addr": "192.168.32.10", "risk_level": {"label": "Reject", "value": 5}, "session": "***", "timestamp": 1743099818, "timestamp_display": "2025/03/28 02:23:38 +0800", "user": "Songyue Wang(aaronwang)"}
| 事件属性 | 表达式类型 | 表达式 |
| Event Category | Regex | <150>jumpserver:\s*(\S+) |
| Event ID | JSON | /"risk_level"/"label" |
| Destination IP | Regex | "asset"\s:\s"[^"]*((\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}))" |
| Source IP | JSON | /"remote_addr" |
| Username | JSON | /"user" |
由于不同事件种类的日志格式不同,如果想涵盖所有事件,需要为每个事件属性添加更多的剖析规则,不在这里展开。JumpServer的事件种类及对应的日志payload样例请参照官方文档。
为违规远程命令新建QID记录,高层次及低层次种类可选择可疑 => 使用者行为。

打开一个JumpServer会话,输入一些被禁止的命令测试,确定QRadar可以接收到事件并正确剖析。


自动响应违规操作
设置JumpServer API
新建一个JumpServer管理员SIEM Automation,然后访问JumpServer的Docker容器为其新建永久API key:

docker exec -it jms_core /bin/bash cd /opt/jumpserver/apps python manage.py shell from users.models import User u = User.objects.get(username='siem-automation') u.create_private_token()

访问http://[JumpServer_IP]/api/docs即可查看JumpServer的API文档,点Authorize输入密钥之后可在线测试。

账户封锁脚本
起初我们计划在用户多次尝试违规指令之后,将其账户设为inactive状态。但测试之后发现,虽然用户会立即失去访问JumpServer及开启新会话的权限,但用户仍然可以在会话最长连接时间(max session time)到期前继续在已连接的SFTP/SSH/RDP中进行操作。因此,还需要用/terminal/tasks/kill-session/这个API主动终止所有活动中的会话。脚本的最终逻辑为:
- QRadar自动传入用户名变量,格式为FULL_NAME(username),例如Songyue Wang(aaronwang)
- 获得用户uuid
- 获得用户当前会话session uuid
- 逐个终止当前会话
- 将用户账户设置为inactive
import requests
import json
import sys
jumpserver_ip = "JUMPSERVER_IP"
admin_api_key = "JUMPSERVER_API_KEY"
request_headers = {
'Authorization': f'Token {admin_api_key}',
'Content-Type': 'application/json',
}
def main():
username = sys.argv[1]
username = username.split("(")[1].split(")")[0]
user_id = get_user_id(username)
session_ids = get_current_sessions(user_id)
terminate_session(session_ids)
disable_user(user_id)
def get_user_id(username):
url = f"http://{jumpserver_ip}/api/v1/users/users/?username={username}"
payload = {}
response = requests.request("GET", url, headers=request_headers, data=payload)
response = json.loads(response.text)[0]
return response["id"]
def get_current_sessions(user_id):
url = f"http://{jumpserver_ip}/api/v1/terminal/sessions/?user_id={user_id}&is_finished=false"
session_ids = []
response = requests.request("GET", url, headers=request_headers)
response = json.loads(response.text)
for i in response:
session_ids.append(i["id"])
return session_ids
def terminate_session(session_ids):
url = f"http://{jumpserver_ip}/api/v1/terminal/tasks/kill-session/"
payload = {}
for i in session_ids:
payload[i] = ""
response = requests.request("POST", url, headers=request_headers, data=json.dumps(payload))
response = json.loads(response.text)
return len(response["ok"]) == len(session_ids)
def disable_user(user_id):
url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/"
payload = {"is_active": False}
response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload))
if __name__ == "__main__":
main()
登录QRadar SIEM管理端,Admin => Custom Actions => Define Actions => Add上传刚刚的Python脚本,并且设置需要QRadar自动传入的变量(事件中的用户名)。完成后需要部署变更。

可以先将传入变量设置为固定值,在JumpServer里打开一个会话,执行一下测试。可以看到QRadar能成功调用到JumpServer的API(JumpServer会话页面显示Terminated by admin SIEM Automation并且提示重新登录)。




定义攻击规则
Offenses => Rules => Actions =>New Event Rule新建攻击规则。我们的定义是24小时内,同一JumpServer用户在整个PAM系统中一共触发了三次被拒绝的远程命令。

在Rule Response里勾选Execute Custom Action,并选择之前上传的Python脚本。

到这里就可以进行测试了。同一个PAM用户在24小时之内三次尝试违规命令,在第三次命令被拒绝后的十秒左右,用户JumpServer的工作台就会显示所有连接断开,并退出登录。再次尝试登录时,登录页面提示用户名或密码错误(JumpServer不会提示用户被禁用)。


打开QRadar的攻击页面,显示一项Rejected PAM Command攻击,在此可查看相关的事件详情,例如各命令的内容,目标服务器,及尝试执行的时间。

按照QRadar默认设置,如果同一个JumpServer的用户在攻击规则触发的五天内又有三次违规操作,QRadar不会显示新的攻击并执行脚本,而是将新的事件归到已有攻击。因此在恢复用户JumpServer账户前要及时关闭完成调查的攻击,这样封锁脚本才会继续生效。

恢复账户
在完成事件响应并排除隐患后,需要恢复用户对PAM的访问:
- 向用户发送强制重设密码邮件
- 在用户下次登录时强制重设MFA
- 将账户状态重新设为active
设置SMTP服务
System Settings => Notifications填入邮件服务提供商的SMTP服务器地址,邮箱账户及密码。如果JumpServer部署在防火墙后面,记得在防火墙策略放通587(SMTP)或465(SMTPS)端口。

测试服务器可以正常发出邮件。

账户恢复脚本
管理员在JumpServer控制台进行上述操作,或者执行以下脚本。待用户收到邮件,完成密码和MFA充值后,即可正常访问PAM系统。
import requests
import json
import sys
jumpserver_ip = "JUMPSERVER_IP"
admin_api_key = "JUMPSERVER_API_KEY"
request_headers = {
'Authorization': f'Token {admin_api_key}',
'Content-Type': 'application/json',
}
def main():
username = input("Username: ")
user_id = get_user_id(username)
if user_id:
try:
send_password_reset(user_id)
send_mfa_reset(user_id)
enable_user(user_id)
print(f"User {username} has been successfully recovered.")
except Exception as e:
print(f"An error occurred while recovering the user: {e}")
else:
print("User ID not found")
def get_user_id(username):
url = f"http://{jumpserver_ip}/api/v1/users/users/?username={username}"
payload = {}
response = requests.request("GET", url, headers=request_headers, data=json.dumps(payload))
response = json.loads(response.text)
if len(response) == 0:
return False
else:
return response[0]["id"]
def send_password_reset(user_id):
url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/password/reset/"
payload = {}
response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload))
response = json.loads(response.text)
def send_mfa_reset(user_id):
url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/mfa/reset/"
payload = {}
response = requests.request("GET", url, headers=request_headers, data=json.dumps(payload))
response = json.loads(response.text)
def enable_user(user_id):
url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/"
payload = {"is_active": True}
response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload))
response = json.loads(response.text)
if __name__ == "__main__":
main()

