使用QRadar SIEM自动响应特权访问管理(PAM)的安全问题

原创内容,转载请注明来源https://songyue.wang

最近在环境中设置了JumpServer开源堡垒机,来监管开发人员对特权身份(Privileged Identity)的使用。我们计划用JumpServer内置的command filter功能,根据用户角色对特权会话中执行的命令进行限制,例如阻止所有用户使用su/passwd等命令绕过PAM改密码或者切换账户,限制用户使用shutdown/reboot/poweroff电源命令等。然而,JumpServer的command filter只能拦截并记录用户行为,无法根据用户行为感知内部人员威胁或PAM密码泄漏等安全事故。因此,我们使用QRadar SIEM进行简单的事件归集,并在QRadar上部署Python脚本自动化响应特权身份管理(PIM)的安全事件,例如中断会话或禁用PAM登录。

添加日志来源

先关闭JumpServer服务器,在config.txt中加入syslog目的地,然后重新启动JumpServer:

./jmsctl.sh down
nano /opt/jumpserver/config/config.txt
SYSLOG_ENABLE=true
SYSLOG_ADDR=192.168.32.239:514
SYSLOG_FACILITY=local2
./jmsctl.sh start

在QRadar为JumpServer建立日志来源,并部署变更。QRadar添加自定义日志源的方法可参考这篇博客

需要特别注意的是,JumpServer的日志payload通常较大(包含远程服务器的信息或命令的具体内容),而JumpServer发送syslog默认使用UDP传输,可能导致QRadar接收时被截断。解决方法有改用TCP发送日志或调大QRadar对UDP payload长度限制。我们使用第二种,Admin => System Settings => Switch to Advance => System Settings =>Max UDP Syslog Payload Length,从1024改成2048即可,完成后部署变更。

QRadar没有预置JumpServer的DSM,需要手动建立payload剖析和QID事件对映,至少要添加一个远程命令拒绝事件。

session_command_log的剖析规则如下:

<150>jumpserver: session_command_log - {"account": "jumpserver_pim(jumpserver_pim)", "asset": "JumpServer-PAM(192.168.32.3)", "id": "***", "input": "su", "org_id": "00000000-0000-0000-0000-000000000002", "output": "\u001b[31mCommand `su` is forbidden\n\r\u001b[0m", "remote_addr": "192.168.32.10", "risk_level": {"label": "Reject", "value": 5}, "session": "***", "timestamp": 1743099818, "timestamp_display": "2025/03/28 02:23:38 +0800", "user": "Songyue Wang(aaronwang)"} 
事件属性表达式类型表达式
Event CategoryRegex<150>jumpserver:\s*(\S+)
Event IDJSON/"risk_level"/"label"
Destination IPRegex"asset"\s:\s"[^"]*((\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}))"
Source IPJSON/"remote_addr"
UsernameJSON/"user"

由于不同事件种类的日志格式不同,如果想涵盖所有事件,需要为每个事件属性添加更多的剖析规则,不在这里展开。JumpServer的事件种类及对应的日志payload样例请参照官方文档

为违规远程命令新建QID记录,高层次及低层次种类可选择可疑 => 使用者行为

打开一个JumpServer会话,输入一些被禁止的命令测试,确定QRadar可以接收到事件并正确剖析。

自动响应违规操作

设置JumpServer API

新建一个JumpServer管理员SIEM Automation,然后访问JumpServer的Docker容器为其新建永久API key:

docker exec -it jms_core /bin/bash
cd /opt/jumpserver/apps
python manage.py shell
from users.models import User
u = User.objects.get(username='siem-automation')
u.create_private_token()

访问http://[JumpServer_IP]/api/docs即可查看JumpServer的API文档,点Authorize输入密钥之后可在线测试。

账户封锁脚本

起初我们计划在用户多次尝试违规指令之后,将其账户设为inactive状态。但测试之后发现,虽然用户会立即失去访问JumpServer及开启新会话的权限,但用户仍然可以在会话最长连接时间(max session time)到期前继续在已连接的SFTP/SSH/RDP中进行操作。因此,还需要用/terminal/tasks/kill-session/这个API主动终止所有活动中的会话。脚本的最终逻辑为:

  1. QRadar自动传入用户名变量,格式为FULL_NAME(username),例如Songyue Wang(aaronwang)
  2. 获得用户uuid
  3. 获得用户当前会话session uuid
  4. 逐个终止当前会话
  5. 将用户账户设置为inactive
import requests
import json
import sys

jumpserver_ip = "JUMPSERVER_IP"
admin_api_key = "JUMPSERVER_API_KEY"
request_headers = {
  'Authorization': f'Token {admin_api_key}',
  'Content-Type': 'application/json',
}

def main():
  username = sys.argv[1]
  username = username.split("(")[1].split(")")[0]
  user_id = get_user_id(username)
  session_ids = get_current_sessions(user_id)
  terminate_session(session_ids)
  disable_user(user_id)

def get_user_id(username):
  url = f"http://{jumpserver_ip}/api/v1/users/users/?username={username}"
  payload = {}
  response = requests.request("GET", url, headers=request_headers, data=payload)
  response = json.loads(response.text)[0]
  return response["id"]

def get_current_sessions(user_id):
  url = f"http://{jumpserver_ip}/api/v1/terminal/sessions/?user_id={user_id}&is_finished=false"
  session_ids = []
  response = requests.request("GET", url, headers=request_headers)
  response = json.loads(response.text)
  for i in response:
      session_ids.append(i["id"])

  return session_ids

def terminate_session(session_ids):
  url = f"http://{jumpserver_ip}/api/v1/terminal/tasks/kill-session/"
  payload = {}
  for i in session_ids:
    payload[i] = ""
  response = requests.request("POST", url, headers=request_headers, data=json.dumps(payload))
  response = json.loads(response.text)

  return len(response["ok"]) == len(session_ids)

def disable_user(user_id):
  url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/"
  payload = {"is_active": False}
  response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload))

if __name__ == "__main__":
  main()

登录QRadar SIEM管理端,Admin => Custom Actions => Define Actions => Add上传刚刚的Python脚本,并且设置需要QRadar自动传入的变量(事件中的用户名)。完成后需要部署变更。

可以先将传入变量设置为固定值,在JumpServer里打开一个会话,执行一下测试。可以看到QRadar能成功调用到JumpServer的API(JumpServer会话页面显示Terminated by admin SIEM Automation并且提示重新登录)。

定义攻击规则

Offenses => Rules => Actions =>New Event Rule新建攻击规则。我们的定义是24小时内,同一JumpServer用户在整个PAM系统中一共触发了三次被拒绝的远程命令。

在Rule Response里勾选Execute Custom Action,并选择之前上传的Python脚本。

到这里就可以进行测试了。同一个PAM用户在24小时之内三次尝试违规命令,在第三次命令被拒绝后的十秒左右,用户JumpServer的工作台就会显示所有连接断开,并退出登录。再次尝试登录时,登录页面提示用户名或密码错误(JumpServer不会提示用户被禁用)。

打开QRadar的攻击页面,显示一项Rejected PAM Command攻击,在此可查看相关的事件详情,例如各命令的内容,目标服务器,及尝试执行的时间。

按照QRadar默认设置,如果同一个JumpServer的用户在攻击规则触发的五天内又有三次违规操作,QRadar不会显示新的攻击并执行脚本,而是将新的事件归到已有攻击。因此在恢复用户JumpServer账户前要及时关闭完成调查的攻击,这样封锁脚本才会继续生效。

恢复账户

在完成事件响应并排除隐患后,需要恢复用户对PAM的访问:

  1. 向用户发送强制重设密码邮件
  2. 在用户下次登录时强制重设MFA
  3. 将账户状态重新设为active

设置SMTP服务

System Settings => Notifications填入邮件服务提供商的SMTP服务器地址,邮箱账户及密码。如果JumpServer部署在防火墙后面,记得在防火墙策略放通587(SMTP)或465(SMTPS)端口。

测试服务器可以正常发出邮件。

账户恢复脚本

管理员在JumpServer控制台进行上述操作,或者执行以下脚本。待用户收到邮件,完成密码和MFA充值后,即可正常访问PAM系统。

import requests
import json
import sys

jumpserver_ip = "JUMPSERVER_IP"
admin_api_key = "JUMPSERVER_API_KEY"
request_headers = {
  'Authorization': f'Token {admin_api_key}',
  'Content-Type': 'application/json',
}

def main():
  username = input("Username: ")
  user_id = get_user_id(username)
  if user_id:
    try:
      send_password_reset(user_id)
      send_mfa_reset(user_id)
      enable_user(user_id)
      print(f"User {username} has been successfully recovered.")
    except Exception as e:
      print(f"An error occurred while recovering the user: {e}")
  else:
    print("User ID not found")

def get_user_id(username):
  url = f"http://{jumpserver_ip}/api/v1/users/users/?username={username}"
  payload = {}
  response = requests.request("GET", url, headers=request_headers, data=json.dumps(payload))
  response = json.loads(response.text)
  if len(response) == 0:
    return False
  else:
    return response[0]["id"]

def send_password_reset(user_id):
  url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/password/reset/"
  payload = {}
  response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload))
  response = json.loads(response.text)

def send_mfa_reset(user_id):
  url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/mfa/reset/"
  payload = {}
  response = requests.request("GET", url, headers=request_headers, data=json.dumps(payload))
  response = json.loads(response.text)

def enable_user(user_id):
  url = f"http://{jumpserver_ip}/api/v1/users/users/{user_id}/"
  payload = {"is_active": True}
  response = requests.request("PATCH", url, headers=request_headers, data=json.dumps(payload))
  response = json.loads(response.text)

if __name__ == "__main__":
  main()
Previous
Next