Перейти к основному контенту

Примеры API запросов (получение списка пользователей и устройств, список команд в сессии)

Получение списка целевых систем:
import requests
from config import *
import urllib3
import json
from datetime import datetime

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class JumpServerAPI:
    def __init__(self):
        self.base_url = JUMP_SERVER_URL
        self.session = requests.Session()
        self.session.verify = CERT_PATH
        self.token = None
        self.headers = {
            'Content-Type': 'application/json',
            'X-JMS-ORG': ORG_ID
        }

    def login(self):
        """Authenticate with JumpServer"""
        auth_url = f"{self.base_url}/authentication/auth/"
        auth_data = {
            "username": JUMP_USERNAME,
            "password": JUMP_PASSWORD
        }
        response = self.session.post(auth_url, json=auth_data)
        data = response.json()
        self.token = data.get('token')
        if self.token:
            self.headers['Authorization'] = f'Bearer {self.token}'
            self.session.headers.update(self.headers)
            return True
        else:
            raise Exception(f"Authentication failed: {data}")

    def get_assets(self, limit=100, offset=0):
        """Get list of assets using the assets endpoint"""
        assets_url = f"{self.base_url}/assets/assets/"
        params = {
            'limit': limit,
            'offset': offset,
            'search': '',
            'order': '-date_created'
        }
        
        response = self.session.get(assets_url, params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Failed to get assets: {response.text}")

    def get_supported_resources(self):
        url = f"{self.base_url}/resources/"
        response = self.session.get(url)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Failed to get supported resources: {response.text}")

    def format_asset_info(self, asset):
        """Format asset information for display"""
        created_date = asset.get('date_created', '')
        if created_date:
            # Convert from "YYYY/MM/DD HH:MM:SS +HHMM" to "YYYY-MM-DD HH:MM:SS"
            created_date = created_date.replace('/', '-').split(' +')[0]
            
        return {
            'ID': asset.get('id', 'N/A'),
            'Name': asset.get('name', 'N/A'),
            'IP': asset.get('ip', 'N/A'),
            'Platform': asset.get('platform', 'N/A'),
            'Protocol': asset.get('protocol', 'N/A'),
            'Status': 'Active' if asset.get('is_active') else 'Inactive',
            'Created': created_date if created_date else 'N/A',
            'Comment': asset.get('comment', 'N/A'),
            'Nodes': asset.get('nodes', 'N/A')
        }

def main():
    try:
        # Initialize API client
        api = JumpServerAPI()
        
        # Login to JumpServer
        if api.login():
            print("Successfully authenticated with JumpServer")
        
        # Get assets list
        assets_data = api.get_assets()
        
        # Print assets in a readable format
        print("\nAssets List:")
        print("-" * 80)
        for asset in assets_data.get('results', []):
            asset_info = api.format_asset_info(asset)
            for key, value in asset_info.items():
                print(f"{key}: {value}")
            print("-" * 80)
            
        # Print pagination info
        total = assets_data.get('count', 0)
        print(f"\nTotal assets: {total}")

        
    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main() 
Получение списка пользователей:

import requests
from config import *
import urllib3

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class JumpServerAPI:
    def __init__(self):
        self.base_url = JUMP_SERVER_URL
        self.session = requests.Session()
        self.session.verify = CERT_PATH
        self.token = None
        self.headers = {
            'Content-Type': 'application/json',
            'X-JMS-ORG': ORG_ID
        }

    def login(self):
        auth_url = f"{self.base_url}/authentication/auth/"
        auth_data = {
            "username": JUMP_USERNAME,
            "password": JUMP_PASSWORD
        }
        response = self.session.post(auth_url, json=auth_data)
        data = response.json()
        self.token = data.get('token')
        if self.token:
            self.headers['Authorization'] = f'Bearer {self.token}'
            self.session.headers.update(self.headers)
            return True
        else:
            raise Exception(f"Authentication failed: {data}")

    def get_users(self, limit=100, offset=0):
        users_url = f"{self.base_url}/users/users/"
        params = {
            'limit': limit,
            'offset': offset
        }
        response = self.session.get(users_url, params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Failed to get users: {response.text}")

    def get_org_name(self):
        org_url = f"{self.base_url}/orgs/orgs/current/"
        response = self.session.get(org_url)
        if response.status_code == 200:
            return response.json().get('name', 'N/A')
        else:
            return 'N/A'

def main():
    try:
        api = JumpServerAPI()
        if api.login():
            print("Successfully authenticated with JumpServer")
        org_name = api.get_org_name()
        users_data = api.get_users(limit=1000)
        print(f"\nUsers List (Organization: {org_name}):")
        print("-" * 80)
        for user in users_data.get('results', []):
            username = user.get('username', 'N/A')
            name = user.get('name', 'N/A')
            roles = ', '.join([role.get('display_name', role.get('name', '')) for role in user.get('org_roles', []) + user.get('system_roles', [])])
            print(f"Username: {username}\nName: {name}\nRoles: {roles}\nOrganization: {org_name}")
            print("-" * 80)
        total = users_data.get('count', 0)
        print(f"\nTotal users: {total}")
    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main() 
Получение списка команд в сессии:
import requests
from config import *
import sys
import urllib3
from datetime import datetime

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class JumpServerAPI:
    def __init__(self):
        self.base_url = JUMP_SERVER_URL
        self.session = requests.Session()
        self.session.verify = CERT_PATH
        self.token = None
        self.headers = {
            'Content-Type': 'application/json',
            'X-JMS-ORG': ORG_ID
        }

    def login(self):
        auth_url = f"{self.base_url}/authentication/auth/"
        auth_data = {
            "username": JUMP_USERNAME,
            "password": JUMP_PASSWORD
        }
        response = self.session.post(auth_url, json=auth_data)
        data = response.json()
        self.token = data.get('token')
        if self.token:
            self.headers['Authorization'] = f'Bearer {self.token}'
            self.session.headers.update(self.headers)
            return True
        else:
            raise Exception(f"Authentication failed: {data}")

    def get_session_commands(self, session_id, use_alt=False):
        if use_alt:
            # Альтернативный способ: GET /api/v1/terminal/sessions/{id}/
            url = self.base_url.replace('/api/v1', '') + f"/api/v1/terminal/sessions/{session_id}/"
            response = self.session.get(url)
            if response.status_code == 200:
                try:
                    session_data = response.json()
                    # Предполагаем, что команды лежат в session_data['commands'] или похожем поле
                    commands_list = session_data.get('commands') or session_data.get('command_list') or []
                    if not commands_list:
                        print(f"No commands found for session {session_id} (via /terminal/sessions/{{id}}/)")
                        return
                    print("Session info:")
                    for key, value in session_data.items():
                        if key != 'commands' and key != 'command_list':
                            print(f"  {key}: {value}")
                    print("\nCommands:")
                    for cmd in commands_list:
                        ts = cmd.get('timestamp') or cmd.get('time') or cmd.get('datetime')
                        command = cmd.get('command') or cmd.get('cmd') or cmd.get('input')
                        if ts and command:
                            print(f"  [{ts}] {command}")
                        elif command:
                            print(f"  {command}")
                    return
                except Exception as e:
                    print(f"Error parsing response: {e}")
            else:
                print(f"Failed to get session: {response.status_code} {response.text}")
        else:
            # Стандартный способ: GET /api/v1/terminal/commands/?session_id=...
            url = self.base_url.replace('/api/v1', '') + "/api/v1/terminal/commands/"
            params = {"session_id": session_id}
            response = self.session.get(url, params=params)
            if response.status_code == 200:
                try:
                    commands = response.json()
                    # Определяем список команд
                    if isinstance(commands, dict) and 'results' in commands:
                        commands_list = commands['results']
                    elif isinstance(commands, list):
                        commands_list = commands
                    else:
                        commands_list = []
                    if not commands_list:
                        print(f"No commands found for session {session_id}")
                        return
                    # Информация о сессии (берём из первой команды)
                    session_info_fields = [
                        'session_id', 'user', 'user_id', 'asset', 'asset_id', 'system_user', 'system_user_id',
                        'start_time', 'end_time', 'remote_addr', 'protocol', 'platform', 'org_id'
                    ]
                    first_cmd = commands_list[0]
                    print("Session info:")
                    for field in session_info_fields:
                        if field in first_cmd:
                            print(f"  {field}: {first_cmd[field]}")
                    # Анализ времени сессии
                    start_time = first_cmd.get('start_time')
                    end_time = first_cmd.get('end_time')
                    if start_time and end_time:
                        try:
                            start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
                            end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
                            duration = end_dt - start_dt
                            print(f"  session_duration: {duration}")
                        except Exception:
                            pass
                    print("\nCommands:")
                    for cmd in commands_list:
                        ts = cmd.get('timestamp') or cmd.get('time') or cmd.get('datetime')
                        command = cmd.get('command') or cmd.get('cmd') or cmd.get('input')
                        if ts and command:
                            print(f"  [{ts}] {command}")
                        elif command:
                            print(f"  {command}")
                    return
                except Exception as e:
                    print(f"Error parsing response: {e}")
            else:
                print(f"Failed to get commands: {response.status_code} {response.text}")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python get_session_commands.py <session_id> [--alt]")
        sys.exit(1)
    session_id = sys.argv[1]
    use_alt = len(sys.argv) > 2 and sys.argv[2] == '--alt'
    api = JumpServerAPI()
    try:
        if api.login():
            api.get_session_commands(session_id, use_alt=use_alt)
    except Exception as e:
        print(f"Error: {str(e)}")