Compare commits

...

2 Commits

Author SHA1 Message Date
7bada37794 Keep connection open instead of reconnecting every time 2024-06-11 16:12:39 +02:00
d7c3cc3cd9 Backend for future Webapp 2024-06-11 15:53:57 +02:00
4 changed files with 350 additions and 281 deletions

296
OLDmain.py Normal file
View File

@@ -0,0 +1,296 @@
from rcon.battleye import Client
from rcon.battleye import Client, ServerMessage
import sys
import os
import json
import socket
from time import sleep
# Define the server details
SERVER_ADDRESS = ''
SERVER_PORT = 0
RCON_PASSWORD = ''
# Check if credentials file exists
if os.path.exists('credentials.json'):
# Load credentials from file
with open('credentials.json', 'r') as file:
credentials = json.load(file)
SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '')
SERVER_PORT = credentials.get('SERVER_PORT', 0)
RCON_PASSWORD = credentials.get('RCON_PASSWORD', '')
else:
# Generate example credentials file
credentials = {
'SERVER_ADDRESS': 'YOUR-SERVER-ADDRESS', # Example Values
'SERVER_PORT': 2303, # Example Values
'RCON_PASSWORD': 'RCON_PASSWORD' # Example Values
}
with open('credentials.json', 'w') as file:
json.dump(credentials, file, indent=4)
print("Please fill in the server details in the credentials.json file.")
sys.exit()
def print_menu():
print()
print("1. List Players")
print("2. Messaging")
print("3. Locking")
print("4. Kicking")
print("5. Banning")
print("6. Server menu")
print("7. Exit")
def handle_input(choice):
if choice == "1":
list_players()
elif choice == "2":
print_message_menu()
message_choice = input("Enter your choice: ")
handle_message_input(message_choice)
elif choice == "3":
print_locking_menu()
locking_choice = input("Enter your choice: ")
handle_locking_input(locking_choice)
elif choice == "4":
print_kick_menu()
kick_choice = input("Enter your choice: ")
handle_kick_input(kick_choice)
elif choice == "5":
print_banning_menu()
banning_choice = input("Enter your choice: ")
handle_banning_input(banning_choice)
elif choice == "6":
print_server_menu()
server_choice = input("Enter your choice: ")
handle_server_input(server_choice)
elif choice == "7":
sys.exit()
else:
print("Invalid choice. Please try again.")
def print_message_menu():
print("1. Send Global Message")
print("2. Send Direct Message")
print("3. Back")
def print_server_menu():
print("1. Soft Server Restart (Restart within 3 minutes with warning messages sent to the players)")
print("2. Hard Server Restart (Restart immediately without warning messages)")
print("3. Back")
def print_locking_menu():
print("1. Lock Server")
print("2. Unlock Server")
print("3. Back")
def print_kick_menu():
print("1. Kick Single Player")
print("2. Kick All Players")
print("3. Back")
def print_banning_menu():
print("1. Show Banlist")
print("2. Ban Player")
print("3. Ban Player by SteamID")
print("4. Unban Player")
print("5. Back")
def handle_message_input(choice):
if choice == "1":
send_global_message()
elif choice == "2":
send_direct_message()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_locking_input(choice):
if choice == "1":
lock_server()
elif choice == "2":
unlock_server()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_kick_input(choice):
if choice == "1":
kick_single_player()
elif choice == "2":
kick_all_players()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_banning_input(choice):
if choice == "1":
show_banlist()
elif choice == "2":
ban_player()
elif choice == "3":
ban_player_by_steamid()
elif choice == "4":
unban_player()
elif choice == "5":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_server_input(choice):
if choice == "1":
soft_restart()
elif choice == "2":
confirm = input("Are you sure you want to perform a hard restart? (y/n): ")
if confirm.lower() == "y":
hard_restart()
else:
print("Hard restart cancelled.")
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def send_global_message():
message = input("Enter the message to send: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'Say -1 {message}')
print(response)
def send_direct_message():
list_players()
player_name = input("Enter the # (number) of the player to message: ")
message = input("Enter the message to send: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'Say -{player_name} {message}')
print(response)
def hard_restart():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#shutdown')
print(response)
def soft_restart(): # Not implemented yet
print('not implemented yet')
pass
def list_players():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('players')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def kick_single_player():
list_players()
player_name = input("Enter the # (number) of the player to kick: ")
reason = input("Enter the reason for kicking the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'kick {player_name} "{reason}"')
if not response:
print("Player kicked successfully.")
else:
print(response)
def kick_all_players():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#kick -1')
print(response)
def show_banlist():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('Bans')
print(response)
def ban_player():
list_players()
player_name = input("Enter the # (number) of the player to ban: ")
ban_time = input("Enter the ban time in minutes: ")
reason = input("Enter the reason for banning the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'ban {player_name} {ban_time} "{reason}"')
if not response:
print("Player banned successfully.")
else:
print(response)
def server_monitor():
while True:
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('status')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
sleep(30)
def ban_player_by_steamid():
steamid = input("Enter the SteamID of the player to ban: ")
ban_time = input("Enter the ban time in minutes: ")
reason = input("Enter the reason for banning the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'ban -1 {ban_time} "{reason}" {steamid}')
if not response:
print("Player banned successfully.")
else:
print(response)
def unban_player():
show_banlist()
ban_id = input("Enter the # (number) of the ban to remove: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'removeBan {ban_id}')
if not response:
print("Ban removed successfully.")
else:
print(response)
def lock_server():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#lock')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def unlock_server():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#unlock')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def list_players():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('players')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def create_cli():
while True:
print_menu()
choice = input("Enter your choice: ")
handle_input(choice)
if __name__ == "__main__":
create_cli()

Binary file not shown.

328
main.py
View File

@@ -1,296 +1,66 @@
from rcon.battleye import Client
from rcon.battleye import Client, ServerMessage
import sys
import os
from fastapi import FastAPI, Depends
from pydantic import BaseModel
import json
import socket
from time import sleep
from rcon.battleye import Client
app = FastAPI()
# Define the server details
SERVER_ADDRESS = ''
SERVER_PORT = 0
RCON_PASSWORD = ''
# Check if credentials file exists
if os.path.exists('credentials.json'):
# Load credentials from file
with open('credentials.json', 'r') as file:
# Load credentials
credentials_file = 'credentials.json'
with open(credentials_file, 'r') as file:
credentials = json.load(file)
SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '')
SERVER_PORT = credentials.get('SERVER_PORT', 0)
RCON_PASSWORD = credentials.get('RCON_PASSWORD', '')
else:
# Generate example credentials file
credentials = {
'SERVER_ADDRESS': 'YOUR-SERVER-ADDRESS', # Example Values
'SERVER_PORT': 2303, # Example Values
'RCON_PASSWORD': 'RCON_PASSWORD' # Example Values
}
with open('credentials.json', 'w') as file:
json.dump(credentials, file, indent=4)
print("Please fill in the server details in the credentials.json file.")
sys.exit()
SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '')
SERVER_PORT = credentials.get('SERVER_PORT', 0)
RCON_PASSWORD = credentials.get('RCON_PASSWORD', '')
def print_menu():
print()
print("1. List Players")
print("2. Messaging")
print("3. Locking")
print("4. Kicking")
print("5. Banning")
print("6. Server menu")
print("7. Exit")
class RCONManager:
def __init__(self, address, port, password):
self.client = Client(address, port, passwd=password)
self.client.connect()
def handle_input(choice):
if choice == "1":
list_players()
elif choice == "2":
print_message_menu()
message_choice = input("Enter your choice: ")
handle_message_input(message_choice)
elif choice == "3":
print_locking_menu()
locking_choice = input("Enter your choice: ")
handle_locking_input(locking_choice)
elif choice == "4":
print_kick_menu()
kick_choice = input("Enter your choice: ")
handle_kick_input(kick_choice)
elif choice == "5":
print_banning_menu()
banning_choice = input("Enter your choice: ")
handle_banning_input(banning_choice)
elif choice == "6":
print_server_menu()
server_choice = input("Enter your choice: ")
handle_server_input(server_choice)
elif choice == "7":
sys.exit()
else:
print("Invalid choice. Please try again.")
def run(self, command):
return self.client.run(command)
def print_message_menu():
print("1. Send Global Message")
print("2. Send Direct Message")
print("3. Back")
def close(self):
self.client.disconnect()
def print_server_menu():
print("1. Soft Server Restart (Restart within 3 minutes with warning messages sent to the players)")
print("2. Hard Server Restart (Restart immediately without warning messages)")
print("3. Back")
rcon_manager = RCONManager(SERVER_ADDRESS, SERVER_PORT, RCON_PASSWORD)
def print_locking_menu():
print("1. Lock Server")
print("2. Unlock Server")
print("3. Back")
@app.on_event("shutdown")
def shutdown_event():
rcon_manager.close()
def print_kick_menu():
print("1. Kick Single Player")
print("2. Kick All Players")
print("3. Back")
class PlayerAction(BaseModel):
steamid: str
reason: str
duration: int
def print_banning_menu():
print("1. Show Banlist")
print("2. Ban Player")
print("3. Ban Player by SteamID")
print("4. Unban Player")
print("5. Back")
def get_rcon_manager():
return rcon_manager
def handle_message_input(choice):
if choice == "1":
send_global_message()
elif choice == "2":
send_direct_message()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
@app.get("/players")
def list_players(rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run('players')
return {"response": response}
def handle_locking_input(choice):
if choice == "1":
lock_server()
elif choice == "2":
unlock_server()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
# Ähnliche Änderungen für die anderen Endpunkte anwenden
@app.post("/kick")
def kick_player(action: PlayerAction, rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run(f'kick {action.steamid} {action.reason}')
return {"response": response}
def handle_kick_input(choice):
if choice == "1":
kick_single_player()
elif choice == "2":
kick_all_players()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
@app.post("/ban")
def ban_player(action: PlayerAction, rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run(f'ban {action.steamid} {action.duration} "{action.reason}"')
return {"response": response}
def handle_banning_input(choice):
if choice == "1":
show_banlist()
elif choice == "2":
ban_player()
elif choice == "3":
ban_player_by_steamid()
elif choice == "4":
unban_player()
elif choice == "5":
print_menu()
else:
print("Invalid choice. Please try again.")
@app.post("/unlock")
def unlock_server(rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run('#unlock')
return {"response": response}
def handle_server_input(choice):
if choice == "1":
soft_restart()
elif choice == "2":
confirm = input("Are you sure you want to perform a hard restart? (y/n): ")
if confirm.lower() == "y":
hard_restart()
else:
print("Hard restart cancelled.")
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def send_global_message():
message = input("Enter the message to send: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'Say -1 {message}')
print(response)
def send_direct_message():
list_players()
player_name = input("Enter the # (number) of the player to message: ")
message = input("Enter the message to send: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'Say -{player_name} {message}')
print(response)
def hard_restart():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#shutdown')
print(response)
def soft_restart(): # Not implemented yet
print('not implemented yet')
pass
def list_players():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('players')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def kick_single_player():
list_players()
player_name = input("Enter the # (number) of the player to kick: ")
reason = input("Enter the reason for kicking the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'kick {player_name} "{reason}"')
if not response:
print("Player kicked successfully.")
else:
print(response)
def kick_all_players():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#kick -1')
print(response)
def show_banlist():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('Bans')
print(response)
def ban_player():
list_players()
player_name = input("Enter the # (number) of the player to ban: ")
ban_time = input("Enter the ban time in minutes: ")
reason = input("Enter the reason for banning the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'ban {player_name} {ban_time} "{reason}"')
if not response:
print("Player banned successfully.")
else:
print(response)
def server_monitor():
while True:
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('status')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
sleep(30)
def ban_player_by_steamid():
steamid = input("Enter the SteamID of the player to ban: ")
ban_time = input("Enter the ban time in minutes: ")
reason = input("Enter the reason for banning the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'ban -1 {ban_time} "{reason}" {steamid}')
if not response:
print("Player banned successfully.")
else:
print(response)
def unban_player():
show_banlist()
ban_id = input("Enter the # (number) of the ban to remove: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'removeBan {ban_id}')
if not response:
print("Ban removed successfully.")
else:
print(response)
def lock_server():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#lock')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def unlock_server():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#unlock')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def list_players():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('players')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def create_cli():
while True:
print_menu()
choice = input("Enter your choice: ")
handle_input(choice)
if __name__ == "__main__":
create_cli()
@app.post("/lock")
def lock_server(rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run('#lock')
return {"response": response}

View File

@@ -1 +1,4 @@
fastapi
uvicorn[standard]
rcon
pydantic