Backend for future Webapp

This commit is contained in:
2024-06-11 15:53:57 +02:00
parent adfbe32698
commit d7c3cc3cd9
4 changed files with 334 additions and 280 deletions

313
main.py
View File

@@ -1,296 +1,51 @@
from fastapi import FastAPI
from pydantic import BaseModel
from rcon.battleye import Client
from rcon.battleye import Client, ServerMessage
import sys
import os
import json
import socket
from time import sleep
import os
app = FastAPI()
# Load credentials
credentials_file = 'credentials.json'
with open(credentials_file, 'r') as file:
credentials = json.load(file)
# Define the server details
SERVER_ADDRESS = ''
SERVER_PORT = 0
RCON_PASSWORD = ''
SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '')
SERVER_PORT = credentials.get('SERVER_PORT', 0)
RCON_PASSWORD = credentials.get('RCON_PASSWORD', '')
# Check if credentials file exists
if os.path.exists('credentials.json'):
# Load credentials from file
with open('credentials.json', 'r') as file:
credentials = json.load(file)
SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '')
SERVER_PORT = credentials.get('SERVER_PORT', 0)
RCON_PASSWORD = credentials.get('RCON_PASSWORD', '')
else:
# Generate example credentials file
credentials = {
'SERVER_ADDRESS': 'YOUR-SERVER-ADDRESS', # Example Values
'SERVER_PORT': 2303, # Example Values
'RCON_PASSWORD': 'RCON_PASSWORD' # Example Values
}
with open('credentials.json', 'w') as file:
json.dump(credentials, file, indent=4)
print("Please fill in the server details in the credentials.json file.")
sys.exit()
def print_menu():
print()
print("1. List Players")
print("2. Messaging")
print("3. Locking")
print("4. Kicking")
print("5. Banning")
print("6. Server menu")
print("7. Exit")
def handle_input(choice):
if choice == "1":
list_players()
elif choice == "2":
print_message_menu()
message_choice = input("Enter your choice: ")
handle_message_input(message_choice)
elif choice == "3":
print_locking_menu()
locking_choice = input("Enter your choice: ")
handle_locking_input(locking_choice)
elif choice == "4":
print_kick_menu()
kick_choice = input("Enter your choice: ")
handle_kick_input(kick_choice)
elif choice == "5":
print_banning_menu()
banning_choice = input("Enter your choice: ")
handle_banning_input(banning_choice)
elif choice == "6":
print_server_menu()
server_choice = input("Enter your choice: ")
handle_server_input(server_choice)
elif choice == "7":
sys.exit()
else:
print("Invalid choice. Please try again.")
def print_message_menu():
print("1. Send Global Message")
print("2. Send Direct Message")
print("3. Back")
def print_server_menu():
print("1. Soft Server Restart (Restart within 3 minutes with warning messages sent to the players)")
print("2. Hard Server Restart (Restart immediately without warning messages)")
print("3. Back")
def print_locking_menu():
print("1. Lock Server")
print("2. Unlock Server")
print("3. Back")
def print_kick_menu():
print("1. Kick Single Player")
print("2. Kick All Players")
print("3. Back")
def print_banning_menu():
print("1. Show Banlist")
print("2. Ban Player")
print("3. Ban Player by SteamID")
print("4. Unban Player")
print("5. Back")
def handle_message_input(choice):
if choice == "1":
send_global_message()
elif choice == "2":
send_direct_message()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_locking_input(choice):
if choice == "1":
lock_server()
elif choice == "2":
unlock_server()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_kick_input(choice):
if choice == "1":
kick_single_player()
elif choice == "2":
kick_all_players()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_banning_input(choice):
if choice == "1":
show_banlist()
elif choice == "2":
ban_player()
elif choice == "3":
ban_player_by_steamid()
elif choice == "4":
unban_player()
elif choice == "5":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_server_input(choice):
if choice == "1":
soft_restart()
elif choice == "2":
confirm = input("Are you sure you want to perform a hard restart? (y/n): ")
if confirm.lower() == "y":
hard_restart()
else:
print("Hard restart cancelled.")
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def send_global_message():
message = input("Enter the message to send: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'Say -1 {message}')
print(response)
def send_direct_message():
list_players()
player_name = input("Enter the # (number) of the player to message: ")
message = input("Enter the message to send: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'Say -{player_name} {message}')
print(response)
def hard_restart():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#shutdown')
print(response)
def soft_restart(): # Not implemented yet
print('not implemented yet')
pass
class PlayerAction(BaseModel):
steamid: str
reason: str
duration: int
@app.get("/players")
def list_players():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('players')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def kick_single_player():
list_players()
player_name = input("Enter the # (number) of the player to kick: ")
reason = input("Enter the reason for kicking the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'kick {player_name} "{reason}"')
if not response:
print("Player kicked successfully.")
else:
print(response)
response = client.run('players')
return {"response": response}
def kick_all_players():
@app.post("/kick")
def kick_player(action: PlayerAction):
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#kick -1')
print(response)
response = client.run(f'kick {action.steamid} {action.reason}')
return {"response": response}
def show_banlist():
@app.post("/ban")
def ban_player(action: PlayerAction):
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('Bans')
print(response)
response = client.run(f'ban {action.steamid} {action.duration} "{action.reason}"')
return {"response": response}
def ban_player():
list_players()
player_name = input("Enter the # (number) of the player to ban: ")
ban_time = input("Enter the ban time in minutes: ")
reason = input("Enter the reason for banning the player (can be empty): ")
@app.post("/unlock")
def unlock_server():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'ban {player_name} {ban_time} "{reason}"')
if not response:
print("Player banned successfully.")
else:
print(response)
response = client.run('#unlock')
return {"response": response}
def server_monitor():
while True:
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('status')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
sleep(30)
def ban_player_by_steamid():
steamid = input("Enter the SteamID of the player to ban: ")
ban_time = input("Enter the ban time in minutes: ")
reason = input("Enter the reason for banning the player (can be empty): ")
@app.post("/lock")
def lock_server():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'ban -1 {ban_time} "{reason}" {steamid}')
if not response:
print("Player banned successfully.")
else:
print(response)
def unban_player():
show_banlist()
ban_id = input("Enter the # (number) of the ban to remove: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'removeBan {ban_id}')
if not response:
print("Ban removed successfully.")
else:
print(response)
def lock_server():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#lock')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def unlock_server():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#unlock')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def list_players():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('players')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def create_cli():
while True:
print_menu()
choice = input("Enter your choice: ")
handle_input(choice)
if __name__ == "__main__":
create_cli()
response = client.run('#lock')
return {"response": response}