diff --git a/OLDmain.py b/OLDmain.py new file mode 100644 index 0000000..9f95ebf --- /dev/null +++ b/OLDmain.py @@ -0,0 +1,296 @@ +from rcon.battleye import Client +from rcon.battleye import Client, ServerMessage +import sys +import os +import json +import socket +from time import sleep + + + +# Define the server details +SERVER_ADDRESS = '' +SERVER_PORT = 0 +RCON_PASSWORD = '' + +# Check if credentials file exists +if os.path.exists('credentials.json'): + # Load credentials from file + with open('credentials.json', 'r') as file: + credentials = json.load(file) + SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '') + SERVER_PORT = credentials.get('SERVER_PORT', 0) + RCON_PASSWORD = credentials.get('RCON_PASSWORD', '') +else: + # Generate example credentials file + credentials = { + 'SERVER_ADDRESS': 'YOUR-SERVER-ADDRESS', # Example Values + 'SERVER_PORT': 2303, # Example Values + 'RCON_PASSWORD': 'RCON_PASSWORD' # Example Values + } + with open('credentials.json', 'w') as file: + json.dump(credentials, file, indent=4) + print("Please fill in the server details in the credentials.json file.") + sys.exit() + + +def print_menu(): + print() + print("1. List Players") + print("2. Messaging") + print("3. Locking") + print("4. Kicking") + print("5. Banning") + print("6. Server menu") + print("7. Exit") + +def handle_input(choice): + if choice == "1": + list_players() + elif choice == "2": + print_message_menu() + message_choice = input("Enter your choice: ") + handle_message_input(message_choice) + elif choice == "3": + print_locking_menu() + locking_choice = input("Enter your choice: ") + handle_locking_input(locking_choice) + elif choice == "4": + print_kick_menu() + kick_choice = input("Enter your choice: ") + handle_kick_input(kick_choice) + elif choice == "5": + print_banning_menu() + banning_choice = input("Enter your choice: ") + handle_banning_input(banning_choice) + elif choice == "6": + print_server_menu() + server_choice = input("Enter your choice: ") + handle_server_input(server_choice) + elif choice == "7": + sys.exit() + else: + print("Invalid choice. Please try again.") + +def print_message_menu(): + print("1. Send Global Message") + print("2. Send Direct Message") + print("3. Back") + +def print_server_menu(): + print("1. Soft Server Restart (Restart within 3 minutes with warning messages sent to the players)") + print("2. Hard Server Restart (Restart immediately without warning messages)") + print("3. Back") + +def print_locking_menu(): + print("1. Lock Server") + print("2. Unlock Server") + print("3. Back") + +def print_kick_menu(): + print("1. Kick Single Player") + print("2. Kick All Players") + print("3. Back") + +def print_banning_menu(): + print("1. Show Banlist") + print("2. Ban Player") + print("3. Ban Player by SteamID") + print("4. Unban Player") + print("5. Back") + +def handle_message_input(choice): + if choice == "1": + send_global_message() + elif choice == "2": + send_direct_message() + elif choice == "3": + print_menu() + else: + print("Invalid choice. Please try again.") + +def handle_locking_input(choice): + if choice == "1": + lock_server() + elif choice == "2": + unlock_server() + elif choice == "3": + print_menu() + else: + print("Invalid choice. Please try again.") + +def handle_kick_input(choice): + if choice == "1": + kick_single_player() + elif choice == "2": + kick_all_players() + elif choice == "3": + print_menu() + else: + print("Invalid choice. Please try again.") + +def handle_banning_input(choice): + if choice == "1": + show_banlist() + elif choice == "2": + ban_player() + elif choice == "3": + ban_player_by_steamid() + elif choice == "4": + unban_player() + elif choice == "5": + print_menu() + else: + print("Invalid choice. Please try again.") + +def handle_server_input(choice): + if choice == "1": + soft_restart() + elif choice == "2": + confirm = input("Are you sure you want to perform a hard restart? (y/n): ") + if confirm.lower() == "y": + hard_restart() + else: + print("Hard restart cancelled.") + elif choice == "3": + print_menu() + else: + print("Invalid choice. Please try again.") + +def send_global_message(): + message = input("Enter the message to send: ") + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run(f'Say -1 {message}') + print(response) + +def send_direct_message(): + list_players() + player_name = input("Enter the # (number) of the player to message: ") + message = input("Enter the message to send: ") + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run(f'Say -{player_name} {message}') + print(response) + +def hard_restart(): + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run('#shutdown') + print(response) + +def soft_restart(): # Not implemented yet + print('not implemented yet') + pass + +def list_players(): + try: + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run('players') + print(response) + except socket.timeout: + print("Connection timed out. Please check the server address, port, and network settings.") + except Exception as e: + print(f"An error occurred: {e}") + +def kick_single_player(): + list_players() + player_name = input("Enter the # (number) of the player to kick: ") + reason = input("Enter the reason for kicking the player (can be empty): ") + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run(f'kick {player_name} "{reason}"') + if not response: + print("Player kicked successfully.") + else: + print(response) + +def kick_all_players(): + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run('#kick -1') + print(response) + +def show_banlist(): + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run('Bans') + print(response) + +def ban_player(): + list_players() + player_name = input("Enter the # (number) of the player to ban: ") + ban_time = input("Enter the ban time in minutes: ") + reason = input("Enter the reason for banning the player (can be empty): ") + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run(f'ban {player_name} {ban_time} "{reason}"') + if not response: + print("Player banned successfully.") + else: + print(response) + +def server_monitor(): + while True: + try: + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run('status') + print(response) + except socket.timeout: + print("Connection timed out. Please check the server address, port, and network settings.") + except Exception as e: + print(f"An error occurred: {e}") + sleep(30) + +def ban_player_by_steamid(): + steamid = input("Enter the SteamID of the player to ban: ") + ban_time = input("Enter the ban time in minutes: ") + reason = input("Enter the reason for banning the player (can be empty): ") + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run(f'ban -1 {ban_time} "{reason}" {steamid}') + if not response: + print("Player banned successfully.") + else: + print(response) + +def unban_player(): + show_banlist() + ban_id = input("Enter the # (number) of the ban to remove: ") + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run(f'removeBan {ban_id}') + if not response: + print("Ban removed successfully.") + else: + print(response) + +def lock_server(): + try: + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run('#lock') + print(response) + except socket.timeout: + print("Connection timed out. Please check the server address, port, and network settings.") + except Exception as e: + print(f"An error occurred: {e}") + +def unlock_server(): + try: + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run('#unlock') + print(response) + except socket.timeout: + print("Connection timed out. Please check the server address, port, and network settings.") + except Exception as e: + print(f"An error occurred: {e}") + +def list_players(): + try: + with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: + response = client.run('players') + print(response) + except socket.timeout: + print("Connection timed out. Please check the server address, port, and network settings.") + except Exception as e: + print(f"An error occurred: {e}") + +def create_cli(): + while True: + print_menu() + choice = input("Enter your choice: ") + handle_input(choice) + +if __name__ == "__main__": + create_cli() diff --git a/__pycache__/main.cpython-312.pyc b/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..70d9475 Binary files /dev/null and b/__pycache__/main.cpython-312.pyc differ diff --git a/main.py b/main.py index 9f95ebf..a880789 100644 --- a/main.py +++ b/main.py @@ -1,296 +1,51 @@ +from fastapi import FastAPI +from pydantic import BaseModel from rcon.battleye import Client -from rcon.battleye import Client, ServerMessage -import sys -import os import json -import socket -from time import sleep +import os +app = FastAPI() +# Load credentials +credentials_file = 'credentials.json' +with open(credentials_file, 'r') as file: + credentials = json.load(file) -# Define the server details -SERVER_ADDRESS = '' -SERVER_PORT = 0 -RCON_PASSWORD = '' +SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '') +SERVER_PORT = credentials.get('SERVER_PORT', 0) +RCON_PASSWORD = credentials.get('RCON_PASSWORD', '') -# Check if credentials file exists -if os.path.exists('credentials.json'): - # Load credentials from file - with open('credentials.json', 'r') as file: - credentials = json.load(file) - SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '') - SERVER_PORT = credentials.get('SERVER_PORT', 0) - RCON_PASSWORD = credentials.get('RCON_PASSWORD', '') -else: - # Generate example credentials file - credentials = { - 'SERVER_ADDRESS': 'YOUR-SERVER-ADDRESS', # Example Values - 'SERVER_PORT': 2303, # Example Values - 'RCON_PASSWORD': 'RCON_PASSWORD' # Example Values - } - with open('credentials.json', 'w') as file: - json.dump(credentials, file, indent=4) - print("Please fill in the server details in the credentials.json file.") - sys.exit() - - -def print_menu(): - print() - print("1. List Players") - print("2. Messaging") - print("3. Locking") - print("4. Kicking") - print("5. Banning") - print("6. Server menu") - print("7. Exit") - -def handle_input(choice): - if choice == "1": - list_players() - elif choice == "2": - print_message_menu() - message_choice = input("Enter your choice: ") - handle_message_input(message_choice) - elif choice == "3": - print_locking_menu() - locking_choice = input("Enter your choice: ") - handle_locking_input(locking_choice) - elif choice == "4": - print_kick_menu() - kick_choice = input("Enter your choice: ") - handle_kick_input(kick_choice) - elif choice == "5": - print_banning_menu() - banning_choice = input("Enter your choice: ") - handle_banning_input(banning_choice) - elif choice == "6": - print_server_menu() - server_choice = input("Enter your choice: ") - handle_server_input(server_choice) - elif choice == "7": - sys.exit() - else: - print("Invalid choice. Please try again.") - -def print_message_menu(): - print("1. Send Global Message") - print("2. Send Direct Message") - print("3. Back") - -def print_server_menu(): - print("1. Soft Server Restart (Restart within 3 minutes with warning messages sent to the players)") - print("2. Hard Server Restart (Restart immediately without warning messages)") - print("3. Back") - -def print_locking_menu(): - print("1. Lock Server") - print("2. Unlock Server") - print("3. Back") - -def print_kick_menu(): - print("1. Kick Single Player") - print("2. Kick All Players") - print("3. Back") - -def print_banning_menu(): - print("1. Show Banlist") - print("2. Ban Player") - print("3. Ban Player by SteamID") - print("4. Unban Player") - print("5. Back") - -def handle_message_input(choice): - if choice == "1": - send_global_message() - elif choice == "2": - send_direct_message() - elif choice == "3": - print_menu() - else: - print("Invalid choice. Please try again.") - -def handle_locking_input(choice): - if choice == "1": - lock_server() - elif choice == "2": - unlock_server() - elif choice == "3": - print_menu() - else: - print("Invalid choice. Please try again.") - -def handle_kick_input(choice): - if choice == "1": - kick_single_player() - elif choice == "2": - kick_all_players() - elif choice == "3": - print_menu() - else: - print("Invalid choice. Please try again.") - -def handle_banning_input(choice): - if choice == "1": - show_banlist() - elif choice == "2": - ban_player() - elif choice == "3": - ban_player_by_steamid() - elif choice == "4": - unban_player() - elif choice == "5": - print_menu() - else: - print("Invalid choice. Please try again.") - -def handle_server_input(choice): - if choice == "1": - soft_restart() - elif choice == "2": - confirm = input("Are you sure you want to perform a hard restart? (y/n): ") - if confirm.lower() == "y": - hard_restart() - else: - print("Hard restart cancelled.") - elif choice == "3": - print_menu() - else: - print("Invalid choice. Please try again.") - -def send_global_message(): - message = input("Enter the message to send: ") - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run(f'Say -1 {message}') - print(response) - -def send_direct_message(): - list_players() - player_name = input("Enter the # (number) of the player to message: ") - message = input("Enter the message to send: ") - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run(f'Say -{player_name} {message}') - print(response) - -def hard_restart(): - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run('#shutdown') - print(response) - -def soft_restart(): # Not implemented yet - print('not implemented yet') - pass +class PlayerAction(BaseModel): + steamid: str + reason: str + duration: int +@app.get("/players") def list_players(): - try: - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run('players') - print(response) - except socket.timeout: - print("Connection timed out. Please check the server address, port, and network settings.") - except Exception as e: - print(f"An error occurred: {e}") - -def kick_single_player(): - list_players() - player_name = input("Enter the # (number) of the player to kick: ") - reason = input("Enter the reason for kicking the player (can be empty): ") with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run(f'kick {player_name} "{reason}"') - if not response: - print("Player kicked successfully.") - else: - print(response) + response = client.run('players') + return {"response": response} -def kick_all_players(): +@app.post("/kick") +def kick_player(action: PlayerAction): with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run('#kick -1') - print(response) + response = client.run(f'kick {action.steamid} {action.reason}') + return {"response": response} -def show_banlist(): +@app.post("/ban") +def ban_player(action: PlayerAction): with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run('Bans') - print(response) + response = client.run(f'ban {action.steamid} {action.duration} "{action.reason}"') + return {"response": response} -def ban_player(): - list_players() - player_name = input("Enter the # (number) of the player to ban: ") - ban_time = input("Enter the ban time in minutes: ") - reason = input("Enter the reason for banning the player (can be empty): ") +@app.post("/unlock") +def unlock_server(): with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run(f'ban {player_name} {ban_time} "{reason}"') - if not response: - print("Player banned successfully.") - else: - print(response) + response = client.run('#unlock') + return {"response": response} -def server_monitor(): - while True: - try: - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run('status') - print(response) - except socket.timeout: - print("Connection timed out. Please check the server address, port, and network settings.") - except Exception as e: - print(f"An error occurred: {e}") - sleep(30) - -def ban_player_by_steamid(): - steamid = input("Enter the SteamID of the player to ban: ") - ban_time = input("Enter the ban time in minutes: ") - reason = input("Enter the reason for banning the player (can be empty): ") +@app.post("/lock") +def lock_server(): with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run(f'ban -1 {ban_time} "{reason}" {steamid}') - if not response: - print("Player banned successfully.") - else: - print(response) - -def unban_player(): - show_banlist() - ban_id = input("Enter the # (number) of the ban to remove: ") - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run(f'removeBan {ban_id}') - if not response: - print("Ban removed successfully.") - else: - print(response) - -def lock_server(): - try: - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run('#lock') - print(response) - except socket.timeout: - print("Connection timed out. Please check the server address, port, and network settings.") - except Exception as e: - print(f"An error occurred: {e}") - -def unlock_server(): - try: - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run('#unlock') - print(response) - except socket.timeout: - print("Connection timed out. Please check the server address, port, and network settings.") - except Exception as e: - print(f"An error occurred: {e}") - -def list_players(): - try: - with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client: - response = client.run('players') - print(response) - except socket.timeout: - print("Connection timed out. Please check the server address, port, and network settings.") - except Exception as e: - print(f"An error occurred: {e}") - -def create_cli(): - while True: - print_menu() - choice = input("Enter your choice: ") - handle_input(choice) - -if __name__ == "__main__": - create_cli() + response = client.run('#lock') + return {"response": response} diff --git a/requirements.txt b/requirements.txt index 7201105..f1cd47c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -rcon \ No newline at end of file +fastapi +uvicorn[standard] +rcon +pydantic \ No newline at end of file