Explorar el Código

Add almost all the code for dataframe transformation in the exam

theenglishway (time) hace 2 años
padre
commit
ecb903a6e1
Se han modificado 1 ficheros con 56 adiciones y 0 borrados
  1. 56 0
      src/main/scala/spark.worksheet.sc

+ 56 - 0
src/main/scala/spark.worksheet.sc

@@ -6,12 +6,14 @@ val spark = SparkSession
   .config("spark.master", "local")
   .getOrCreate()
 
+import org.apache.spark.sql.functions.when
 import spark.implicits._
 
 def readInput(filePath: String) = {
   spark.read.option("multiline", "true").json(filePath)
 }
 
+val teams_df = readInput("teams.json")
 val games_df = readInput("games.json")
 val stats_df = readInput("stats.json")
 games_df.describe().show()
@@ -55,3 +57,57 @@ df_merge.show()
 df_merge.write.option("header", true).mode("overwrite").csv("output")
 
 // spark.stop()
+
+/*
+THIS IS THE RELEVANT PART
+ */
+
+val stats_games =
+  stats_df
+    .join(games_df, stats_df("game_id") === games_df("id"))
+    .join(teams_df, stats_df("team.id") === teams_df("id"))
+
+stats_games.show()
+stats_games.count()
+// stats_games_teams[["game.id", "team.id", "pts", "ast", "blk", "reb"]].groupby(["game.id", "team.id"]).agg("pts").sum()
+
+val stats_games_pts = stats_games
+  .groupBy($"game.id".alias("game_id"), $"team.id".alias("team_id"))
+  .agg(Map("pts" -> "sum", "ast" -> "sum", "blk" -> "sum", "reb" -> "sum"))
+stats_games_pts.show()
+
+val teams_games = teams_df
+  .as("t")
+  .join(games_df.as("g"))
+  .where(
+    $"g.home_team.id" === $"t.id"
+      || $"g.visitor_team.id" === $"t.id"
+  )
+  .withColumn(
+    "team_score",
+    when(
+      $"t.id" === $"g.home_team.id",
+      $"g.home_team_score"
+    )
+      .otherwise($"g.visitor_team_score")
+  )
+  .select(
+    $"g.id".alias("game_id"),
+    $"t.id".alias("team_id"),
+    $"t.full_name".alias("team_full_name"),
+    $"team_score"
+  )
+
+teams_games.show()
+
+val merged = teams_games
+  .as("tg")
+  .join(stats_games_pts.as("sgp"))
+  .where(
+    $"tg.game_id" === $"sgp.game_id"
+      && $"tg.team_id" === $"sgp.team_id"
+  )
+  .drop($"sgp.game_id", $"sgp.team_id")
+merged.show()
+
+merged.write.option("header", true).mode("overwrite").csv("output")