Примеры API запросов (получение списка пользователей и устройств, список команд в сессии)
Получение списка целевых систем:
import requests
from config import *
import urllib3
import json
from datetime import datetime
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class JumpServerAPI:
def __init__(self):
self.base_url = JUMP_SERVER_URL
self.session = requests.Session()
self.session.verify = CERT_PATH
self.token = None
self.headers = {
'Content-Type': 'application/json',
'X-JMS-ORG': ORG_ID
}
def login(self):
"""Authenticate with JumpServer"""
auth_url = f"{self.base_url}/authentication/auth/"
auth_data = {
"username": JUMP_USERNAME,
"password": JUMP_PASSWORD
}
response = self.session.post(auth_url, json=auth_data)
data = response.json()
self.token = data.get('token')
if self.token:
self.headers['Authorization'] = f'Bearer {self.token}'
self.session.headers.update(self.headers)
return True
else:
raise Exception(f"Authentication failed: {data}")
def get_assets(self, limit=100, offset=0):
"""Get list of assets using the assets endpoint"""
assets_url = f"{self.base_url}/assets/assets/"
params = {
'limit': limit,
'offset': offset,
'search': '',
'order': '-date_created'
}
response = self.session.get(assets_url, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get assets: {response.text}")
def get_supported_resources(self):
url = f"{self.base_url}/resources/"
response = self.session.get(url)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get supported resources: {response.text}")
def format_asset_info(self, asset):
"""Format asset information for display"""
created_date = asset.get('date_created', '')
if created_date:
# Convert from "YYYY/MM/DD HH:MM:SS +HHMM" to "YYYY-MM-DD HH:MM:SS"
created_date = created_date.replace('/', '-').split(' +')[0]
return {
'ID': asset.get('id', 'N/A'),
'Name': asset.get('name', 'N/A'),
'IP': asset.get('ip', 'N/A'),
'Platform': asset.get('platform', 'N/A'),
'Protocol': asset.get('protocol', 'N/A'),
'Status': 'Active' if asset.get('is_active') else 'Inactive',
'Created': created_date if created_date else 'N/A',
'Comment': asset.get('comment', 'N/A'),
'Nodes': asset.get('nodes', 'N/A')
}
def main():
try:
# Initialize API client
api = JumpServerAPI()
# Login to JumpServer
if api.login():
print("Successfully authenticated with JumpServer")
# Get assets list
assets_data = api.get_assets()
# Print assets in a readable format
print("\nAssets List:")
print("-" * 80)
for asset in assets_data.get('results', []):
asset_info = api.format_asset_info(asset)
for key, value in asset_info.items():
print(f"{key}: {value}")
print("-" * 80)
# Print pagination info
total = assets_data.get('count', 0)
print(f"\nTotal assets: {total}")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()
Получение списка пользователей:
import requests
from config import *
import urllib3
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class JumpServerAPI:
def __init__(self):
self.base_url = JUMP_SERVER_URL
self.session = requests.Session()
self.session.verify = CERT_PATH
self.token = None
self.headers = {
'Content-Type': 'application/json',
'X-JMS-ORG': ORG_ID
}
def login(self):
auth_url = f"{self.base_url}/authentication/auth/"
auth_data = {
"username": JUMP_USERNAME,
"password": JUMP_PASSWORD
}
response = self.session.post(auth_url, json=auth_data)
data = response.json()
self.token = data.get('token')
if self.token:
self.headers['Authorization'] = f'Bearer {self.token}'
self.session.headers.update(self.headers)
return True
else:
raise Exception(f"Authentication failed: {data}")
def get_users(self, limit=100, offset=0):
users_url = f"{self.base_url}/users/users/"
params = {
'limit': limit,
'offset': offset
}
response = self.session.get(users_url, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get users: {response.text}")
def get_org_name(self):
org_url = f"{self.base_url}/orgs/orgs/current/"
response = self.session.get(org_url)
if response.status_code == 200:
return response.json().get('name', 'N/A')
else:
return 'N/A'
def main():
try:
api = JumpServerAPI()
if api.login():
print("Successfully authenticated with JumpServer")
org_name = api.get_org_name()
users_data = api.get_users(limit=1000)
print(f"\nUsers List (Organization: {org_name}):")
print("-" * 80)
for user in users_data.get('results', []):
username = user.get('username', 'N/A')
name = user.get('name', 'N/A')
roles = ', '.join([role.get('display_name', role.get('name', '')) for role in user.get('org_roles', []) + user.get('system_roles', [])])
print(f"Username: {username}\nName: {name}\nRoles: {roles}\nOrganization: {org_name}")
print("-" * 80)
total = users_data.get('count', 0)
print(f"\nTotal users: {total}")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()
Получение списка команд в сессии:
import requests
from config import *
import sys
import urllib3
from datetime import datetime
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class JumpServerAPI:
def __init__(self):
self.base_url = JUMP_SERVER_URL
self.session = requests.Session()
self.session.verify = CERT_PATH
self.token = None
self.headers = {
'Content-Type': 'application/json',
'X-JMS-ORG': ORG_ID
}
def login(self):
auth_url = f"{self.base_url}/authentication/auth/"
auth_data = {
"username": JUMP_USERNAME,
"password": JUMP_PASSWORD
}
response = self.session.post(auth_url, json=auth_data)
data = response.json()
self.token = data.get('token')
if self.token:
self.headers['Authorization'] = f'Bearer {self.token}'
self.session.headers.update(self.headers)
return True
else:
raise Exception(f"Authentication failed: {data}")
def get_session_commands(self, session_id, use_alt=False):
if use_alt:
# Альтернативный способ: GET /api/v1/terminal/sessions/{id}/
url = self.base_url.replace('/api/v1', '') + f"/api/v1/terminal/sessions/{session_id}/"
response = self.session.get(url)
if response.status_code == 200:
try:
session_data = response.json()
# Предполагаем, что команды лежат в session_data['commands'] или похожем поле
commands_list = session_data.get('commands') or session_data.get('command_list') or []
if not commands_list:
print(f"No commands found for session {session_id} (via /terminal/sessions/{{id}}/)")
return
print("Session info:")
for key, value in session_data.items():
if key != 'commands' and key != 'command_list':
print(f" {key}: {value}")
print("\nCommands:")
for cmd in commands_list:
ts = cmd.get('timestamp') or cmd.get('time') or cmd.get('datetime')
command = cmd.get('command') or cmd.get('cmd') or cmd.get('input')
if ts and command:
print(f" [{ts}] {command}")
elif command:
print(f" {command}")
return
except Exception as e:
print(f"Error parsing response: {e}")
else:
print(f"Failed to get session: {response.status_code} {response.text}")
else:
# Стандартный способ: GET /api/v1/terminal/commands/?session_id=...
url = self.base_url.replace('/api/v1', '') + "/api/v1/terminal/commands/"
params = {"session_id": session_id}
response = self.session.get(url, params=params)
if response.status_code == 200:
try:
commands = response.json()
# Определяем список команд
if isinstance(commands, dict) and 'results' in commands:
commands_list = commands['results']
elif isinstance(commands, list):
commands_list = commands
else:
commands_list = []
if not commands_list:
print(f"No commands found for session {session_id}")
return
# Информация о сессии (берём из первой команды)
session_info_fields = [
'session_id', 'user', 'user_id', 'asset', 'asset_id', 'system_user', 'system_user_id',
'start_time', 'end_time', 'remote_addr', 'protocol', 'platform', 'org_id'
]
first_cmd = commands_list[0]
print("Session info:")
for field in session_info_fields:
if field in first_cmd:
print(f" {field}: {first_cmd[field]}")
# Анализ времени сессии
start_time = first_cmd.get('start_time')
end_time = first_cmd.get('end_time')
if start_time and end_time:
try:
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
duration = end_dt - start_dt
print(f" session_duration: {duration}")
except Exception:
pass
print("\nCommands:")
for cmd in commands_list:
ts = cmd.get('timestamp') or cmd.get('time') or cmd.get('datetime')
command = cmd.get('command') or cmd.get('cmd') or cmd.get('input')
if ts and command:
print(f" [{ts}] {command}")
elif command:
print(f" {command}")
return
except Exception as e:
print(f"Error parsing response: {e}")
else:
print(f"Failed to get commands: {response.status_code} {response.text}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python get_session_commands.py <session_id> [--alt]")
sys.exit(1)
session_id = sys.argv[1]
use_alt = len(sys.argv) > 2 and sys.argv[2] == '--alt'
api = JumpServerAPI()
try:
if api.login():
api.get_session_commands(session_id, use_alt=use_alt)
except Exception as e:
print(f"Error: {str(e)}")