|
|
@@ -11,13 +11,13 @@ case class GamesAnalysis(teams: os.Path, games: os.Path, stats: os.Path) {
|
|
|
.getOrCreate()
|
|
|
import spark.implicits._
|
|
|
|
|
|
- val teams_df = readInput(teams)
|
|
|
- val games_df = readInput(games)
|
|
|
- val stats_df = readInput(stats)
|
|
|
+ val teamsDf = readInput(teams)
|
|
|
+ val gamesDf = readInput(games)
|
|
|
+ val statsDf = readInput(stats)
|
|
|
|
|
|
- val teams_games = teams_df
|
|
|
+ val teamsGamesDf = teamsDf
|
|
|
.as("t")
|
|
|
- .join(games_df.as("g"))
|
|
|
+ .join(gamesDf.as("g"))
|
|
|
.where(
|
|
|
$"g.home_team.id" === $"t.id"
|
|
|
|| $"g.visitor_team.id" === $"t.id"
|
|
|
@@ -37,12 +37,12 @@ case class GamesAnalysis(teams: os.Path, games: os.Path, stats: os.Path) {
|
|
|
$"team_score"
|
|
|
)
|
|
|
|
|
|
- val stats_games =
|
|
|
- stats_df
|
|
|
- .join(games_df, stats_df("game_id") === games_df("id"))
|
|
|
- .join(teams_df, stats_df("team.id") === teams_df("id"))
|
|
|
+ val statsGamesDf =
|
|
|
+ statsDf
|
|
|
+ .join(gamesDf, statsDf("game_id") === gamesDf("id"))
|
|
|
+ .join(teamsDf, statsDf("team.id") === teamsDf("id"))
|
|
|
|
|
|
- val stats_games_pts = stats_games
|
|
|
+ val statsByGameAndTeamDf = statsGamesDf
|
|
|
.groupBy($"game.id".alias("game_id"), $"team.id".alias("team_id"))
|
|
|
.agg(
|
|
|
sum($"pts").alias("pts"),
|
|
|
@@ -51,9 +51,9 @@ case class GamesAnalysis(teams: os.Path, games: os.Path, stats: os.Path) {
|
|
|
sum($"reb").alias("reb")
|
|
|
)
|
|
|
|
|
|
- val merged = teams_games
|
|
|
+ val finalDf = teamsGamesDf
|
|
|
.as("tg")
|
|
|
- .join(stats_games_pts.as("sgp"))
|
|
|
+ .join(statsByGameAndTeamDf.as("sgp"))
|
|
|
.where(
|
|
|
$"tg.game_id" === $"sgp.game_id"
|
|
|
&& $"tg.team_id" === $"sgp.team_id"
|