import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("balldontlie") .config("spark.master", "local") .getOrCreate() import spark.implicits._ def readInput(filePath: String) = { spark.read.option("multiline", "true").json(filePath) } val games_df = readInput("games.json") val stats_df = readInput("stats.json") games_df.describe().show() games_df.printSchema() val games_df_short = games_df .selectExpr( "id as game_id", "home_team.id as home_team_id", "home_team.full_name as home_team_name", "home_team_score" ) games_df_short.show() val stats_df_short = stats_df.selectExpr( "id", "game.id as game_id", "player.id as player_id", "concat(player.first_name, ' ', player.last_name) as player_name", "pts", "reb", "ast", "blk" ) stats_df_short.show() stats_df_short .groupBy("player_id", "player_name") .sum("pts") .sort($"sum(pts)".desc) .show() val df_merge = stats_df_short .join(games_df_short, games_df_short("game_id") === stats_df_short("game_id")) .drop(games_df_short("game_id")) .orderBy($"pts".desc) df_merge.show() df_merge.write.option("header", true).mode("overwrite").csv("output") // spark.stop()