import asyncio import datetime from nio.rooms import MatrixRoom from nio.events.room_events import RoomMessage import simplematrixbotlib as botlib from .its_api import ItSyndikatApi from .config import Config class ItSyndikatBot: bot: botlib.Bot its_api: ItSyndikatApi config: Config def __init__(self, config: Config): self.config = config self.current_open_state = None creds = botlib.Creds( config.matrix_homeserver, config.matrix_username, access_token=config.matrix_access_token, session_stored_file="", ) self.its_api = ItSyndikatApi(config) self.bot = botlib.Bot(creds) self.bot.listener.on_message_event(self.on_message) async def run(self): async def poll_for_changes(): while True: try: status = await self.its_api.status() new_state = status["state"]["open"] if ( self.current_open_state is not None and new_state != self.current_open_state ): await self.announce_open_change(new_state) self.current_open_state = new_state except Exception as e: pass await asyncio.sleep(60) asyncio.create_task(poll_for_changes()) await self.bot.main() async def on_message(self, room, message): m = botlib.MessageMatch(room, message, self.bot, self.config.command_prefix) if m.is_not_from_this_bot() and m.prefix(): if m.command("echo"): await self.echo(room, message, m.args()) elif m.command("isitopen"): await self.isitopen(room, message) elif m.command("spaceping"): await self.spaceping(room, message) else: await self.bot.api.send_text_message( room.room_id, f"Unknown command: {m.command()}" ) async def announce_open_change(self, now_open: bool): room_ids = self.config.isitopen_announce_rooms if now_open: message = "opening IT-Syndikat - Ohai!" else: message = "closing IT-Syndikat - nap time!" for room_id in room_ids: await self.bot.api.async_client.room_send( room_id=room_id, message_type="m.room.message", content={ "msgtype": "m.notice", "body": message, }, ) async def reply(self, room, message, reply): await self.bot.api.async_client.room_send( room_id=room.room_id, message_type="m.room.message", content={ "msgtype": "m.text", "body": reply, "m.relates_to": {"m.in_reply_to": {"event_id": message.event_id}}, }, ) async def echo(self, room, message, args): await self.bot.api.send_text_message( room.room_id, " ".join(arg for arg in args) ) async def isitopen(self, room, message): try: status = await self.its_api.status() is_open = status["state"]["open"] self.current_open_state = is_open if is_open: date = datetime.datetime.fromtimestamp(status["state"]["lastchange"]) text = f"positive! space has been open since {date}" else: text = "negative!" except Exception as e: text = f"error checking space status: {e}" await self.reply(room, message, text) async def spaceping(self, room: MatrixRoom, message: RoomMessage): await self.its_api.ping() await self.reply(room, message, "Hello Space!")