its-matrix-bot/its_matrix_bot/__init__.py

79 lines
2.5 KiB
Python
Executable file

import datetime
from nio.rooms import MatrixRoom
from nio.events.room_events import RoomMessage
import simplematrixbotlib as botlib
from .its_api import ItSyndikatApi
from .config import Config
class ItSyndikatBot:
bot: botlib.Bot
its_api: ItSyndikatApi
config: Config
def __init__(self, config: Config):
self.config = config
creds = botlib.Creds(
config.matrix_homeserver,
config.matrix_username,
access_token=config.matrix_access_token,
session_stored_file="",
)
self.its_api = ItSyndikatApi(config)
self.bot = botlib.Bot(creds)
self.bot.listener.on_message_event(self.on_message)
self.bot.run()
async def on_message(self, room, message):
m = botlib.MessageMatch(room, message, self.bot, self.config.command_prefix)
if m.is_not_from_this_bot() and m.prefix():
if m.command("echo"):
await self.echo(room, message, m.args())
elif m.command("isitopen"):
await self.isitopen(room, message)
elif m.command("spaceping"):
await self.spaceping(room, message)
else:
await self.bot.api.send_text_message(
room.room_id, f"Unknown command: {m.command()}"
)
async def reply(self, room, message, reply):
await self.bot.api.async_client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": reply,
"m.relates_to": {"m.in_reply_to": {"event_id": message.event_id}},
},
)
async def echo(self, room, message, args):
await self.bot.api.send_text_message(
room.room_id, " ".join(arg for arg in args)
)
async def isitopen(self, room, message):
try:
status = await self.its_api.status()
if status["state"]["open"]:
date = datetime.datetime.fromtimestamp(status["state"]["lastchange"])
text = f"positive! space has been open since {date}"
else:
text = "negative!"
except Exception as e:
text = f"error checking space status: {e}"
await self.reply(room, message, text)
async def spaceping(self, room: MatrixRoom, message: RoomMessage):
await self.its_api.ping()
await self.reply(room, message, "Hello Space!")