Switched to berconpy, implemented listener

This commit is contained in:
2024-06-13 00:07:40 +02:00
parent 7bada37794
commit 1b897a70a9
7 changed files with 205 additions and 364 deletions

5
.gitignore vendored
View File

@@ -1 +1,4 @@
credentials.json
credentials.json
\venv\
*.env
\__pycache__\

View File

@@ -1,296 +0,0 @@
from rcon.battleye import Client
from rcon.battleye import Client, ServerMessage
import sys
import os
import json
import socket
from time import sleep
# Define the server details
SERVER_ADDRESS = ''
SERVER_PORT = 0
RCON_PASSWORD = ''
# Check if credentials file exists
if os.path.exists('credentials.json'):
# Load credentials from file
with open('credentials.json', 'r') as file:
credentials = json.load(file)
SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '')
SERVER_PORT = credentials.get('SERVER_PORT', 0)
RCON_PASSWORD = credentials.get('RCON_PASSWORD', '')
else:
# Generate example credentials file
credentials = {
'SERVER_ADDRESS': 'YOUR-SERVER-ADDRESS', # Example Values
'SERVER_PORT': 2303, # Example Values
'RCON_PASSWORD': 'RCON_PASSWORD' # Example Values
}
with open('credentials.json', 'w') as file:
json.dump(credentials, file, indent=4)
print("Please fill in the server details in the credentials.json file.")
sys.exit()
def print_menu():
print()
print("1. List Players")
print("2. Messaging")
print("3. Locking")
print("4. Kicking")
print("5. Banning")
print("6. Server menu")
print("7. Exit")
def handle_input(choice):
if choice == "1":
list_players()
elif choice == "2":
print_message_menu()
message_choice = input("Enter your choice: ")
handle_message_input(message_choice)
elif choice == "3":
print_locking_menu()
locking_choice = input("Enter your choice: ")
handle_locking_input(locking_choice)
elif choice == "4":
print_kick_menu()
kick_choice = input("Enter your choice: ")
handle_kick_input(kick_choice)
elif choice == "5":
print_banning_menu()
banning_choice = input("Enter your choice: ")
handle_banning_input(banning_choice)
elif choice == "6":
print_server_menu()
server_choice = input("Enter your choice: ")
handle_server_input(server_choice)
elif choice == "7":
sys.exit()
else:
print("Invalid choice. Please try again.")
def print_message_menu():
print("1. Send Global Message")
print("2. Send Direct Message")
print("3. Back")
def print_server_menu():
print("1. Soft Server Restart (Restart within 3 minutes with warning messages sent to the players)")
print("2. Hard Server Restart (Restart immediately without warning messages)")
print("3. Back")
def print_locking_menu():
print("1. Lock Server")
print("2. Unlock Server")
print("3. Back")
def print_kick_menu():
print("1. Kick Single Player")
print("2. Kick All Players")
print("3. Back")
def print_banning_menu():
print("1. Show Banlist")
print("2. Ban Player")
print("3. Ban Player by SteamID")
print("4. Unban Player")
print("5. Back")
def handle_message_input(choice):
if choice == "1":
send_global_message()
elif choice == "2":
send_direct_message()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_locking_input(choice):
if choice == "1":
lock_server()
elif choice == "2":
unlock_server()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_kick_input(choice):
if choice == "1":
kick_single_player()
elif choice == "2":
kick_all_players()
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_banning_input(choice):
if choice == "1":
show_banlist()
elif choice == "2":
ban_player()
elif choice == "3":
ban_player_by_steamid()
elif choice == "4":
unban_player()
elif choice == "5":
print_menu()
else:
print("Invalid choice. Please try again.")
def handle_server_input(choice):
if choice == "1":
soft_restart()
elif choice == "2":
confirm = input("Are you sure you want to perform a hard restart? (y/n): ")
if confirm.lower() == "y":
hard_restart()
else:
print("Hard restart cancelled.")
elif choice == "3":
print_menu()
else:
print("Invalid choice. Please try again.")
def send_global_message():
message = input("Enter the message to send: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'Say -1 {message}')
print(response)
def send_direct_message():
list_players()
player_name = input("Enter the # (number) of the player to message: ")
message = input("Enter the message to send: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'Say -{player_name} {message}')
print(response)
def hard_restart():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#shutdown')
print(response)
def soft_restart(): # Not implemented yet
print('not implemented yet')
pass
def list_players():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('players')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def kick_single_player():
list_players()
player_name = input("Enter the # (number) of the player to kick: ")
reason = input("Enter the reason for kicking the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'kick {player_name} "{reason}"')
if not response:
print("Player kicked successfully.")
else:
print(response)
def kick_all_players():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#kick -1')
print(response)
def show_banlist():
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('Bans')
print(response)
def ban_player():
list_players()
player_name = input("Enter the # (number) of the player to ban: ")
ban_time = input("Enter the ban time in minutes: ")
reason = input("Enter the reason for banning the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'ban {player_name} {ban_time} "{reason}"')
if not response:
print("Player banned successfully.")
else:
print(response)
def server_monitor():
while True:
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('status')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
sleep(30)
def ban_player_by_steamid():
steamid = input("Enter the SteamID of the player to ban: ")
ban_time = input("Enter the ban time in minutes: ")
reason = input("Enter the reason for banning the player (can be empty): ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'ban -1 {ban_time} "{reason}" {steamid}')
if not response:
print("Player banned successfully.")
else:
print(response)
def unban_player():
show_banlist()
ban_id = input("Enter the # (number) of the ban to remove: ")
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run(f'removeBan {ban_id}')
if not response:
print("Ban removed successfully.")
else:
print(response)
def lock_server():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#lock')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def unlock_server():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('#unlock')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def list_players():
try:
with Client(SERVER_ADDRESS, SERVER_PORT, passwd=RCON_PASSWORD) as client:
response = client.run('players')
print(response)
except socket.timeout:
print("Connection timed out. Please check the server address, port, and network settings.")
except Exception as e:
print(f"An error occurred: {e}")
def create_cli():
while True:
print_menu()
choice = input("Enter your choice: ")
handle_input(choice)
if __name__ == "__main__":
create_cli()

View File

@@ -1,3 +1,6 @@
## This is the API Branch of this project.
The mail goal is to provide an RCON backend that can be used by another container (Werbserver)
## DayZ Server Management Tool
This project is a Python tool for managing a DayZ server using the BattlEye RCON protocol. Key features include:

146
api.py Normal file
View File

@@ -0,0 +1,146 @@
import asyncio
import logging
import uvicorn
import json
import os
from pydantic import BaseModel
import berconpy as rcon
from fastapi import FastAPI, HTTPException, Depends
# RCON Server Details
credentials_file = os.path.join(os.path.dirname(__file__), "credentials.json")
with open(credentials_file) as f:
credentials = json.load(f)
IP_ADDR = credentials["SERVER_ADDRESS"]
PORT = credentials["SERVER_PORT"]
PASSWORD = credentials["RCON_PASSWORD"]
# Logging Configuration
log = logging.getLogger("berconpy")
log.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(name)s: %(message)s"))
log.addHandler(handler)
# RCON Client Initialization
client = rcon.AsyncRCONClient()
# FastAPI Initialization
app = FastAPI()
# Player Model
class Player(BaseModel):
name: str
score: int
class PlayerAction(BaseModel):
playerid: str
reason: str
duration: int = None # Optional for ban duration
# Dependency to get RCON manager
async def get_rcon_manager():
async with client.connect(IP_ADDR, PORT, PASSWORD):
yield client
# FastAPI Endpoint to List Players
@app.get("/players")
async def list_players(rcon: rcon.AsyncRCONClient = Depends(get_rcon_manager)):
try:
response = await rcon.send_command("players")
players = []
for line in response.splitlines():
parts = line.split()
if len(parts) >= 2:
name = parts[0]
try:
score = int(parts[1])
except ValueError:
score = 0
player = Player(name=name, score=score)
players.append(player)
return players
except Exception as e:
log.error(f"Error fetching players: {e}")
raise HTTPException(status_code=500, detail="Error fetching players")
# Kick Player Endpoint
@app.post("/kick")
async def kick_player(action: PlayerAction, rcon: rcon.AsyncRCONClient = Depends(get_rcon_manager)):
try:
response = await rcon.send_command(f'kick {action.playerid} {action.reason}')
return {"response": response}
except Exception as e:
log.error(f"Error kicking player: {e}")
raise HTTPException(status_code=500, detail="Error kicking player")
# Ban Player Endpoint
@app.post("/ban")
async def ban_player(action: PlayerAction, rcon: rcon.AsyncRCONClient = Depends(get_rcon_manager)):
try:
duration_str = f"{action.duration} " if action.duration else ""
response = await rcon.send_command(f'ban {action.playerid} {duration_str}"{action.reason}"')
return {"response": response}
except Exception as e:
log.error(f"Error banning player: {e}")
raise HTTPException(status_code=500, detail="Error banning player")
# Unlock Server Endpoint
@app.post("/unlock")
async def unlock_server(rcon: rcon.AsyncRCONClient = Depends(get_rcon_manager)):
try:
response = await rcon.send_command('#unlock')
return {"response": response}
except Exception as e:
log.error(f"Error unlocking server: {e}")
raise HTTPException(status_code=500, detail="Error unlocking server")
# Global Message Endpoint
@app.post("/global_message")
async def global_message(message: str, rcon: rcon.AsyncRCONClient = Depends(get_rcon_manager)):
try:
response = await rcon.send_command(f'Say -1 {message}')
return {"response": response}
except Exception as e:
log.error(f"Error sending global message: {e}")
raise HTTPException(status_code=500, detail="Error sending global message")
# Direct Message Endpoint
@app.post("/direct_message")
async def direct_message(playerid: str, message: str, rcon: rcon.AsyncRCONClient = Depends(get_rcon_manager)):
try:
response = await rcon.send_command(f'Say {playerid} {message}')
return {"response": response}
except Exception as e:
log.error(f"Error sending direct message: {e}")
raise HTTPException(status_code=500, detail="Error sending direct message")
# Lock Server Endpoint
@app.post("/lock")
async def lock_server(rcon: rcon.AsyncRCONClient = Depends(get_rcon_manager)):
try:
response = await rcon.send_command('#lock')
return {"response": response}
except Exception as e:
log.error(f"Error locking server: {e}")
raise HTTPException(status_code=500, detail="Error locking server")
# RCON Event Handlers
@client.dispatch.on_login
async def on_login():
print("Connected to RCON server.")
@client.dispatch.on_message
async def on_message(message: str):
print("Received message from server:", message)
@client.dispatch.on_command
async def server_response_to_command(response: str):
if not response:
return print("on_command: <empty>")
print("on_command:", response)
# Main Function to Run FastAPI App
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

66
main.py
View File

@@ -1,66 +0,0 @@
from fastapi import FastAPI, Depends
from pydantic import BaseModel
import json
from rcon.battleye import Client
app = FastAPI()
# Load credentials
credentials_file = 'credentials.json'
with open(credentials_file, 'r') as file:
credentials = json.load(file)
SERVER_ADDRESS = credentials.get('SERVER_ADDRESS', '')
SERVER_PORT = credentials.get('SERVER_PORT', 0)
RCON_PASSWORD = credentials.get('RCON_PASSWORD', '')
class RCONManager:
def __init__(self, address, port, password):
self.client = Client(address, port, passwd=password)
self.client.connect()
def run(self, command):
return self.client.run(command)
def close(self):
self.client.disconnect()
rcon_manager = RCONManager(SERVER_ADDRESS, SERVER_PORT, RCON_PASSWORD)
@app.on_event("shutdown")
def shutdown_event():
rcon_manager.close()
class PlayerAction(BaseModel):
steamid: str
reason: str
duration: int
def get_rcon_manager():
return rcon_manager
@app.get("/players")
def list_players(rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run('players')
return {"response": response}
# Ähnliche Änderungen für die anderen Endpunkte anwenden
@app.post("/kick")
def kick_player(action: PlayerAction, rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run(f'kick {action.steamid} {action.reason}')
return {"response": response}
@app.post("/ban")
def ban_player(action: PlayerAction, rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run(f'ban {action.steamid} {action.duration} "{action.reason}"')
return {"response": response}
@app.post("/unlock")
def unlock_server(rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run('#unlock')
return {"response": response}
@app.post("/lock")
def lock_server(rcon: RCONManager = Depends(get_rcon_manager)):
response = rcon.run('#lock')
return {"response": response}

51
message_handler.py Normal file
View File

@@ -0,0 +1,51 @@
"""Listens to an RCON server for messages."""
import asyncio
import logging
import math
import berconpy as rcon
IP = "217.72.204.88"
PORT = 2313
PASSWORD = "u1BSlMM4LH0"
log = logging.getLogger("berconpy")
log.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(name)s: %(message)s"))
log.addHandler(handler)
client = rcon.AsyncRCONClient()
@client.dispatch.on_login
async def on_login():
print("on_login")
@client.dispatch.on_message
async def on_message(message: str):
print("on_message:", message)
@client.dispatch.on_command
async def server_response_to_command(response: str):
# this event also includes keep alive commands we send to the server;
# for handling commands, reading the return value of
# `await client.send_command()` is the recommended method
if not response:
return print("on_command: <empty>")
print("on_command:", response)
# Other events are documented in AsyncRCONClient
async def main():
async with client.connect(IP, PORT, PASSWORD):
await asyncio.sleep(math.inf)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,4 +1,4 @@
fastapi
uvicorn[standard]
rcon
berconpy
pydantic