170 lines
7.8 KiB
Python
170 lines
7.8 KiB
Python
import random
|
|
import re
|
|
import requests
|
|
from nextcord import File
|
|
from unidecode import unidecode
|
|
from datetime import datetime
|
|
from settings import *
|
|
import geopandas as gpd
|
|
from shapely.geometry import Point
|
|
from geopy.distance import geodesic
|
|
from geopy.geocoders import Nominatim
|
|
from math import sinh, atan, pi, log, tan, cos, ceil
|
|
from PIL import Image
|
|
from itertools import chain
|
|
from tabulate import tabulate
|
|
|
|
geolocator = Nominatim(user_agent="discord-bot-spoutnik")
|
|
|
|
def tile_to_wgs84(x, y, z):
|
|
return atan(sinh(pi * (1 - 2 * y / 2 ** z))) * 180 / pi, (x / 2 ** z - 0.5) * 360
|
|
|
|
|
|
def wgs84_to_tile(lat, lon, z):
|
|
return int((lon + 180) / 360 * 2 ** z), int((1 - (log(tan(lat * pi / 180) + 1 / cos(lat * pi / 180))) / pi) / 2 * 2 ** z)
|
|
|
|
|
|
def random_point(filename):
|
|
shape = gpd.read_file(filename).union_all()
|
|
|
|
min_x, min_y, max_x, max_y = shape.bounds
|
|
|
|
while True:
|
|
random_x = random.uniform(min_x, max_x)
|
|
random_y = random.uniform(min_y, max_y)
|
|
point = Point(random_x, random_y)
|
|
|
|
if shape.contains(point):
|
|
return point.coords[0][::-1]
|
|
|
|
def clean_address(address):
|
|
output = address
|
|
output = re.sub(r", Lausanne, District de Lausanne, Vaud, \d+, Suisse", "", output)
|
|
return output
|
|
|
|
class Game:
|
|
def __init__(self, channel):
|
|
self.channel = channel
|
|
self.target = None
|
|
self.preset = None
|
|
self.file = f"/tmp/spoutnik_{channel}.jpg"
|
|
self.winner = None
|
|
self.last_player = None
|
|
self.last_datetime = None
|
|
self.reset_data = {key.lower(): [value["label"], set()] for key, value in SPOUTNIK_PRESETS.items()}
|
|
self.jump_url = None
|
|
|
|
async def reset(self, preset, force=False):
|
|
if not force and not self.winner and self.last_datetime and (datetime.now() - self.last_datetime).total_seconds() < SPOUTNIK_EXPIRY_TIMEOUT:
|
|
return
|
|
|
|
output = ""
|
|
if self.target and not self.winner:
|
|
output = f":expressionless: Le lieu précédent était [ici](<https://www.google.ch/maps/@{self.target[0]},{self.target[1]},{self.preset["zoom"]+1}z>).\n\n"
|
|
self.preset = preset
|
|
zoom = self.preset["zoom"]
|
|
random_lat, random_lon = random_point(self.preset["geojson_file"])
|
|
tile_x, tile_y = wgs84_to_tile(random_lat, random_lon, zoom+1)
|
|
with open("/tmp/a.jpg", "wb") as f:
|
|
f.write(requests.get(f"https://khms2.google.com/kh/v=1000?x={tile_x}&y={tile_y}&z={zoom+1}").content)
|
|
with open("/tmp/b.jpg", "wb") as f:
|
|
f.write(requests.get(f"https://khms2.google.com/kh/v=1000?x={tile_x+1}&y={tile_y}&z={zoom+1}").content)
|
|
with open("/tmp/c.jpg", "wb") as f:
|
|
f.write(requests.get(f"https://khms2.google.com/kh/v=1000?x={tile_x}&y={tile_y+1}&z={zoom+1}").content)
|
|
with open("/tmp/d.jpg", "wb") as f:
|
|
f.write(requests.get(f"https://khms2.google.com/kh/v=1000?x={tile_x+1}&y={tile_y+1}&z={zoom+1}").content)
|
|
abcd = Image.new("RGB", (256 * 2, 256 * 2))
|
|
abcd.paste(Image.open("/tmp/a.jpg"), (0, 0))
|
|
abcd.paste(Image.open("/tmp/b.jpg"), (256, 0))
|
|
abcd.paste(Image.open("/tmp/c.jpg"), (0, 256))
|
|
abcd.paste(Image.open("/tmp/d.jpg"), (256, 256))
|
|
abcd.save(self.file)
|
|
self.target = tile_to_wgs84(tile_x + 1, tile_y + 1, zoom+1)
|
|
self.winner = None
|
|
self.last_player = None
|
|
self.last_datetime = None
|
|
self.reset_data = {key.lower(): [value["label"], set()] for key, value in SPOUTNIK_PRESETS.items()}
|
|
output += f":satellite_orbital: Il y a un nouveau lieu à trouver ! Le terrain de jeu est {self.preset["label"]}."
|
|
output += f"\n\nDonnez-moi les coordonnées GPS du centre de l'image, à {self.preset["max_distance"]} m près."
|
|
output += f"\n\n*Sur le site Google Maps, clic droit puis clic sur les coordonnées.*"
|
|
output += f"\n*Sur l'app Google Maps, appui long puis appui sur les coordonnées.*"
|
|
self.jump_url = (await self.channel.send(output, file=File(self.file))).jump_url
|
|
|
|
async def parse(self, message):
|
|
guess = unidecode(message.content.strip()).upper()
|
|
|
|
# if it's the special keyword 'reset', consider resetting
|
|
if (regex := re.match(r"^RESET( [A-Z]+)?$", guess)) and SPOUTNIK_RESETTERS_NEEDED != 0:
|
|
if message.author in set(chain.from_iterable(value[1] for value in self.reset_data.values())):
|
|
await self.channel.send(f"{message.author.mention} : tu as déjà proposé une réinitialisation")
|
|
return
|
|
|
|
choice = (regex.group(1) or "").strip().lower()
|
|
if choice not in self.reset_data.keys():
|
|
options = []
|
|
for key, value in self.reset_data.items():
|
|
options.append(f"* `{key}` : {value[0]}")
|
|
await self.channel.send(f"{message.author.mention} : je ne connais pas ce preset, les options sont :\n{'\n'.join(options)}")
|
|
return
|
|
|
|
self.reset_data[choice][1].add(message.author)
|
|
|
|
chosen = None
|
|
table = []
|
|
for key, value in self.reset_data.items():
|
|
table.append([key, value[0], f"{len(value[1])}/{SPOUTNIK_RESETTERS_NEEDED}"])
|
|
if len(value[1]) >= SPOUTNIK_RESETTERS_NEEDED:
|
|
chosen = key
|
|
table = "```\n" + tabulate(table, tablefmt="rounded_grid") + "\n```"
|
|
await self.channel.send(f"Réinitialisation :\n{table}")
|
|
|
|
if chosen:
|
|
await self.reset(preset=SPOUTNIK_PRESETS[chosen], force=True)
|
|
return
|
|
|
|
# if somebody has already won, return silently
|
|
if self.winner:
|
|
return
|
|
|
|
# same if the game was never initialized
|
|
if not self.target:
|
|
return
|
|
|
|
# same if the message is not WGS 84 coordinates
|
|
if not (regex := re.match(r"^(\d{1,2}[.,]\d+) *[ ,.;:] *(\d{1,3}[.,]\d+)$", guess)):
|
|
return
|
|
|
|
# check for errors and react accordingly
|
|
error = False
|
|
if SPOUTNIK_FORCE_ALTERNATION and message.author == self.last_player and (elapsed := (datetime.now() - self.last_datetime).total_seconds()) < SPOUTNIK_ALTERNATION_TIMEOUT:
|
|
await message.add_reaction("\N{BUSTS IN SILHOUETTE}")
|
|
error = True
|
|
remaining = ceil(SPOUTNIK_ALTERNATION_TIMEOUT - elapsed)
|
|
await self.channel.send(f"{message.author.mention} : tu pourras rejouer dans {remaining} seconde{'s' if remaining > 1 else ''}")
|
|
if error:
|
|
return
|
|
|
|
# everything is OK, validate the proposal
|
|
self.last_player = message.author
|
|
self.last_datetime = datetime.now()
|
|
|
|
lat = float(regex.group(1).replace(",", "."))
|
|
lon = float(regex.group(2).replace(",", "."))
|
|
|
|
distance = geodesic((lat, lon), self.target).meters
|
|
blurred_distance = f"{ceil(distance / 1000)} km" if distance > 1000 else f"{ceil(distance / 100) * 100} m"
|
|
zoom = self.preset["zoom"]
|
|
if distance <= self.preset["max_distance"]:
|
|
self.winner = message.author
|
|
self.reset_data = {key.lower(): [value["label"], set()] for key, value in SPOUTNIK_PRESETS.items()}
|
|
output = f":trophy: YOUPI ! {message.author.mention} a trouvé le lieu ! :trophy:"
|
|
output += f"\n\n[Lieu exact](<https://www.google.ch/maps/@{self.target[0]},{self.target[1]},{zoom+1}z>)"
|
|
await self.channel.send(output)
|
|
else:
|
|
try:
|
|
address = clean_address(geolocator.reverse((lat, lon), language="fr").address)
|
|
except ValueError:
|
|
address = "un lieu apparemment invalide"
|
|
output = f"{message.author.mention} propose [un point](<https://www.google.ch/maps/@{lat},{lon},{zoom+1}z>) proche de {address}."
|
|
output += f"\n\nC'est pas ça chef, mais tu es à moins de {blurred_distance} de [la cible](<{self.jump_url}>) !"
|
|
await self.channel.send(output) |