spark.worksheet.sc 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import org.apache.spark.sql.SparkSession
  2. val spark = SparkSession
  3. .builder()
  4. .appName("balldontlie")
  5. .config("spark.master", "local")
  6. .getOrCreate()
  7. import spark.implicits._
  8. def readInput(filePath: String) = {
  9. spark.read.option("multiline", "true").json(filePath)
  10. }
  11. val games_df = readInput("games.json")
  12. val stats_df = readInput("stats.json")
  13. games_df.describe().show()
  14. games_df.printSchema()
  15. val games_df_short = games_df
  16. .selectExpr(
  17. "id as game_id",
  18. "home_team.id as home_team_id",
  19. "home_team.full_name as home_team_name",
  20. "home_team_score"
  21. )
  22. games_df_short.show()
  23. val stats_df_short = stats_df.selectExpr(
  24. "id",
  25. "game.id as game_id",
  26. "player.id as player_id",
  27. "concat(player.first_name, ' ', player.last_name) as player_name",
  28. "pts",
  29. "reb",
  30. "ast",
  31. "blk"
  32. )
  33. stats_df_short.show()
  34. stats_df_short
  35. .groupBy("player_id", "player_name")
  36. .sum("pts")
  37. .sort($"sum(pts)".desc)
  38. .show()
  39. val df_merge = stats_df_short
  40. .join(games_df_short, games_df_short("game_id") === stats_df_short("game_id"))
  41. .drop(games_df_short("game_id"))
  42. .orderBy($"pts".desc)
  43. df_merge.show()
  44. df_merge.write.option("header", true).mode("overwrite").csv("output")
  45. // spark.stop()