web.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from datetime import datetime, timedelta
  2. from fastapi import FastAPI, Request, Depends
  3. from fastapi.responses import HTMLResponse
  4. from fastapi.staticfiles import StaticFiles
  5. from fastapi.templating import Jinja2Templates
  6. from babel.dates import format_datetime
  7. from babel import Locale
  8. import humanize
  9. from zoneinfo import ZoneInfo
  10. from media_observer.medias import media_collection
  11. from media_observer.storage import Storage
  12. from media_observer.similarity_index import SimilaritySearch
  13. def add_date_processing(_any):
  14. # At the moment this information comes out of nowhere but one might imagine that
  15. # in the future it can be deducted from the request or from information the
  16. # user gives.
  17. user_tz = ZoneInfo("Europe/Paris")
  18. def absolute_datetime(dt):
  19. return format_datetime(
  20. dt.astimezone(user_tz),
  21. format="EEEE d MMMM @ HH:mm",
  22. locale=Locale("fr", "FR"),
  23. )
  24. def duration(reference, target):
  25. humanize.activate("fr_FR")
  26. delta = target - reference
  27. if abs(delta.total_seconds()) < 10 * 60:
  28. return "en même temps"
  29. elif delta > timedelta(0):
  30. return f"{humanize.naturaldelta(delta)} après"
  31. else:
  32. return f"{humanize.naturaldelta(-delta)} avant"
  33. return {
  34. "absolute_datetime": absolute_datetime,
  35. "duration": duration,
  36. }
  37. def add_logos(_any):
  38. return {
  39. "logos_info": {
  40. m.name: {
  41. "background_color": m.logo_background_color,
  42. "content": m.logo_content,
  43. "src": m.logo_src,
  44. }
  45. for m in media_collection.values()
  46. }
  47. }
  48. app = FastAPI()
  49. app.mount("/static", StaticFiles(directory="static"), name="static")
  50. templates = Jinja2Templates(
  51. directory="templates", context_processors=[add_date_processing, add_logos]
  52. )
  53. storage = None
  54. async def get_db():
  55. global storage
  56. if storage is None:
  57. storage = await Storage.create()
  58. return storage
  59. sim_index: SimilaritySearch | None = None
  60. async def get_similarity_search(storage: Storage = Depends(get_db)):
  61. global sim_index
  62. if sim_index is None or sim_index.stale:
  63. sim_index = SimilaritySearch.load(storage)
  64. return sim_index
  65. else:
  66. return sim_index
  67. @app.get("/", response_class=HTMLResponse)
  68. async def index(request: Request, storage: Storage = Depends(get_db)):
  69. sites = await storage.list_sites()
  70. return templates.TemplateResponse(
  71. request=request,
  72. name="index.html",
  73. context={"page_title": "Observatoire des médias", "sites": sites},
  74. )
  75. @app.get("/sites/{id}/main_article", response_class=HTMLResponse)
  76. @app.get("/sites/{id}/main_article/{timestamp}", response_class=HTMLResponse)
  77. async def site_main_article_frontpage(
  78. request: Request,
  79. id: int,
  80. timestamp: datetime | None = None,
  81. storage: Storage = Depends(get_db),
  82. sim_index: SimilaritySearch = Depends(get_similarity_search),
  83. ):
  84. def get_article_sibling(after_before_articles, cond_fun):
  85. return min(
  86. [a for a in after_before_articles if cond_fun(a)],
  87. key=lambda a: abs(a["time_diff"]),
  88. default=None,
  89. )
  90. main_articles = await storage.list_neighbouring_main_articles(id, timestamp)
  91. [focused_article] = [
  92. a for a in main_articles if a["site_id"] == id and a["time_diff"] == 0
  93. ]
  94. simultaneous_articles = sorted(
  95. [a for a in main_articles if a["site_id"] != id and a["time_diff"] == 0],
  96. key=lambda a: a["site_id"],
  97. )
  98. same_site_articles = [
  99. a for a in main_articles if a["site_id"] == id and a["time_diff"] != 0
  100. ]
  101. focused_title_id = focused_article["title_id"]
  102. try:
  103. [(_, similar)] = await sim_index.search(
  104. [focused_title_id],
  105. 20,
  106. lambda s: s < 100 and s >= 25,
  107. )
  108. except KeyError:
  109. similar = []
  110. similar_by_id = {s[0]: s[1] for s in similar}
  111. similar_articles = await storage.list_articles_on_frontpage(
  112. list(similar_by_id.keys())
  113. )
  114. # A list of articles and score, sorted by descending score
  115. similar_articles_and_score = sorted(
  116. [
  117. (a, similar_by_id[a["title_id"]])
  118. for a in similar_articles
  119. if a["title_id"] != focused_title_id
  120. ],
  121. key=lambda a: a[1],
  122. reverse=True,
  123. )
  124. return templates.TemplateResponse(
  125. request=request,
  126. name="site_main_article_detail.html",
  127. context={
  128. "site_id": id,
  129. "focused": focused_article,
  130. "similar": similar_articles_and_score,
  131. "simultaneous_up": [
  132. a
  133. for a in simultaneous_articles
  134. if a["site_id"] < focused_article["site_id"]
  135. ],
  136. "simultaneous_down": [
  137. a
  138. for a in simultaneous_articles
  139. if a["site_id"] > focused_article["site_id"]
  140. ],
  141. "after": get_article_sibling(
  142. same_site_articles,
  143. lambda a: a["time_diff"] > 0,
  144. ),
  145. "before": get_article_sibling(
  146. same_site_articles,
  147. lambda a: a["time_diff"] < 0,
  148. ),
  149. },
  150. )