记录一下sparksql的dataframe 中常用的操作,spark在大数据处理方面有很广泛的应供,每天都在研究spark的源码,简单记录一下以便后续查阅,今天先简单整理一下,后续逐步完善.
版本:spark 2.0.1
数据显示
这个showString 是spark内部的方法,我们实际是调用不到的,但是我们调用的show方法最终都是调用了这个showString1
2
3
4
5
6
7
8/**
* Compose the string representing rows for output
*
* @param _numRows Number of rows to show
* @param truncate If set to more than 0, truncates strings to `truncate` characters and
* all cells will be aligned right.
*/
private[sql] def showString(_numRows: Int, truncate: Int = 20): String
将dataSet转换成dataFrame
datafram其实是按列来存储的dataset1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25/**
* Converts this strongly typed collection of data to generic `DataFrame` with columns renamed.
* This can be quite convenient in conversion from an RDD of tuples into a `DataFrame` with
* meaningful names. For example:
* {{{
* val rdd: RDD[(Int, String)] = ...
* rdd.toDF() // this implicit conversion creates a DataFrame with column name `_1` and `_2`
* rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
* }}}
*
* @group basic
* @since 2.0.0
*/
@scala.annotation.varargs
def toDF(colNames: String*): DataFrame = {
require(schema.size == colNames.size,
"The number of columns doesn't match.\n" +
s"Old column names (${schema.size}): " + schema.fields.map(_.name).mkString(", ") + "\n" +
s"New column names (${colNames.size}): " + colNames.mkString(", "))
val newCols = logicalPlan.output.zip(colNames).map { case (oldAttribute, newName) =>
Column(oldAttribute).as(newName)
}
select(newCols : _*)
}
输出当前dataset的结构信息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* Returns the schema of this Dataset.
*
* @group basic
* @since 1.6.0
*/
def schema: StructType = queryExecution.analyzed.schema
/**
* Prints the schema to the console in a nice tree format.
*
* @group basic
* @since 1.6.0
*/
// scalastyle:off println
def printSchema(): Unit = println(schema.treeString)
// scalastyle:on println
输出一些调试信息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22/**
* Prints the plans (logical and physical) to the console for debugging purposes.
*
* @group basic
* @since 1.6.0
*/
def explain(extended: Boolean): Unit = {
val explain = ExplainCommand(queryExecution.logical, extended = extended)
sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
// scalastyle:off println
r => println(r.getString(0))
// scalastyle:on println
}
}
/**
* Prints the physical plan to the console for debugging purposes.
*
* @group basic
* @since 1.6.0
*/
def explain(): Unit = explain(extended = false)
输出列名以及每个列的类型1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* Returns all column names and their data types as an array.
*
* @group basic
* @since 1.6.0
*/
def dtypes: Array[(String, String)] = schema.fields.map { field =>
(field.name, field.dataType.toString)
}
/**
* Returns all column names as an array.
*
* @group basic
* @since 1.6.0
*/
def columns: Array[String] = schema.fields.map(_.name)
是否能够获取数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* Returns true if the `collect` and `take` methods can be run locally
* (without any Spark executors).
*
* @group basic
* @since 1.6.0
*/
def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]
/**
* Returns true if this Dataset contains one or more sources that continuously
* return data as it arrives. A Dataset that reads data from a streaming source
* must be executed as a `StreamingQuery` using the `start()` method in
* `DataStreamWriter`. Methods that return a single answer, e.g. `count()` or
* `collect()`, will throw an [[AnalysisException]] when there is a streaming
* source present.
*
* @group streaming
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
def isStreaming: Boolean = logicalPlan.isStreaming
检查点,以前没有用过,需要在研究一下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33/**
* Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate
* the logical plan of this Dataset, which is especially useful in iterative algorithms where the
* plan may grow exponentially. It will be saved to files inside the checkpoint
* directory set with `SparkContext#setCheckpointDir`.
*
* @group basic
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
def checkpoint(): Dataset[T] = checkpoint(eager = true)
/**
* Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
* logical plan of this Dataset, which is especially useful in iterative algorithms where the
* plan may grow exponentially. It will be saved to files inside the checkpoint
* directory set with `SparkContext#setCheckpointDir`.
*
* @group basic
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
def checkpoint(eager: Boolean): Dataset[T] = {
val internalRdd = queryExecution.toRdd.map(_.copy())
internalRdd.checkpoint()
if (eager) {
internalRdd.count()
}
val physicalPlan = queryExecution.executedPlan
这个什么鬼需要在分析一下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 // Takes the first leaf partitioning whenever we see a `PartitioningCollection`. Otherwise the
// size of `PartitioningCollection` may grow exponentially for queries involving deep inner
// joins.
def firstLeafPartitioning(partitioning: Partitioning): Partitioning = {
partitioning match {
case p: PartitioningCollection => firstLeafPartitioning(p.partitionings.head)
case p => p
}
}
val outputPartitioning = firstLeafPartitioning(physicalPlan.outputPartitioning)
Dataset.ofRows(
sparkSession,
LogicalRDD(
logicalPlan.output,
internalRdd,
outputPartitioning,
physicalPlan.outputOrdering
)(sparkSession)).as[T]
}
水印???1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* :: Experimental ::
* Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time
* before which we assume no more late data is going to arrive.
*
* Spark will use this watermark for several purposes:
* - To know when a given time window aggregation can be finalized and thus can be emitted when
* using output modes that do not allow updates.
* - To minimize the amount of state that we need to keep for on-going aggregations,
* `mapGroupsWithState` and `dropDuplicates` operators.
*
* The current watermark is computed by looking at the `MAX(eventTime)` seen across
* all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost
* of coordinating this value across partitions, the actual watermark used is only guaranteed
* to be at least `delayThreshold` behind the actual event time. In some cases we may still
* process records that arrive more than `delayThreshold` late.
*
* @param eventTime the name of the column that contains the event time of the row.
* @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
* record that has been processed in the form of an interval
* (e.g. "1 minute" or "5 hours").
*
* @group streaming
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
// We only accept an existing column name, not a derived column here as a watermark that is
// defined on a derived column cannot referenced elsewhere in the plan.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
}
数据展示
1 | /** |
数据的关联
1 |
|
排序分组
1 | /** |
提取指定的列1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* Selects column based on the column name and return it as a [[Column]].
*
* @note The column name can also reference to a nested column like `a.b`.
*
* @group untypedrel
* @since 2.0.0
*/
def apply(colName: String): Column = col(colName)
/**
* Selects column based on the column name and return it as a [[Column]].
*
* @note The column name can also reference to a nested column like `a.b`.
*
* @group untypedrel
* @since 2.0.0
*/
def col(colName: String): Column = colName match {
case "*" =>
Column(ResolvedStar(queryExecution.analyzed.output))
case _ =>
val expr = resolve(colName)
Column(expr)
}
别名
别名有给列取别名的也有给dataset取别名的这里是给当前dataset取别名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Returns a new Dataset with an alias set.
*
* @group typedrel
* @since 1.6.0
*/
def as(alias: String): Dataset[T] = withTypedPlan {
SubqueryAlias(alias, logicalPlan, None)
}
/**
* (Scala-specific) Returns a new Dataset with an alias set.
*
* @group typedrel
* @since 2.0.0
*/
def as(alias: Symbol): Dataset[T] = as(alias.name)
/**
* Returns a new Dataset with an alias set. Same as `as`.
*
* @group typedrel
* @since 2.0.0
*/
def alias(alias: String): Dataset[T] = as(alias)
/**
* (Scala-specific) Returns a new Dataset with an alias set. Same as `as`.
*
* @group typedrel
* @since 2.0.0
*/
def alias(alias: Symbol): Dataset[T] = as(alias)
查询
查询有很多种接口使用的方式不太一样1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 /**
* Selects a set of column based expressions.
* {{{
* ds.select($"colA", $"colB" + 1)
* }}}
*
* @group untypedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame = withPlan {
Project(cols.map(_.named), logicalPlan)
}
/**
* Selects a set of columns. This is a variant of `select` that can only select
* existing columns using column names (i.e. cannot construct expressions).
*
* {{{
* // The following two are equivalent:
* ds.select("colA", "colB")
* ds.select($"colA", $"colB")
* }}}
*
* @group untypedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
/**
* Selects a set of SQL expressions. This is a variant of `select` that accepts
* SQL expressions.
*
* {{{
* // The following are equivalent:
* ds.selectExpr("colA", "colB as newName", "abs(colC)")
* ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
* }}}
*
* @group untypedrel
* @since 2.0.0
*/
按照表达式来查询
@scala.annotation.varargs
def selectExpr(exprs: String*): DataFrame = {
select(exprs.map { expr =>
Column(sparkSession.sessionState.sqlParser.parseExpression(expr))
}: _*)
}
/**
* :: Experimental ::
* Returns a new Dataset by computing the given [[Column]] expression for each element.
*
* {{{
* val ds = Seq(1, 2, 3).toDS()
* val newDS = ds.select(expr("value + 1").as[Int])
* }}}
*
* @group typedrel
* @since 1.6.0
*/
@Experimental
@InterfaceStability.Evolving
def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
implicit val encoder = c1.encoder
val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named :: Nil,
logicalPlan)
if (encoder.flat) {
new Dataset[U1](sparkSession, project, encoder)
} else {
// Flattens inner fields of U1
new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1)
}
}
/**
* Internal helper function for building typed selects that return tuples. For simplicity and
* code reuse, we do this without the help of the type system and then use helper functions
* that cast appropriately for the user facing interface.
*/
???这个查询怎么用
protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
val encoders = columns.map(_.encoder)
val namedColumns =
columns.map(_.withInputType(exprEnc, logicalPlan.output).named)
val execution = new QueryExecution(sparkSession, Project(namedColumns, logicalPlan))
new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders))
}
/**
* :: Experimental ::
* Returns a new Dataset by computing the given [[Column]] expressions for each element.
*
* @group typedrel
* @since 1.6.0
*/
@Experimental
@InterfaceStability.Evolving
def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)] =
selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]]
/**
* :: Experimental ::
* Returns a new Dataset by computing the given [[Column]] expressions for each element.
*
* @group typedrel
* @since 1.6.0
*/
@Experimental
@InterfaceStability.Evolving
def select[U1, U2, U3](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)] =
selectUntyped(c1, c2, c3).asInstanceOf[Dataset[(U1, U2, U3)]]
/**
* :: Experimental ::
* Returns a new Dataset by computing the given [[Column]] expressions for each element.
*
* @group typedrel
* @since 1.6.0
*/
@Experimental
@InterfaceStability.Evolving
def select[U1, U2, U3, U4](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)] =
selectUntyped(c1, c2, c3, c4).asInstanceOf[Dataset[(U1, U2, U3, U4)]]
/**
* :: Experimental ::
* Returns a new Dataset by computing the given [[Column]] expressions for each element.
*
* @group typedrel
* @since 1.6.0
*/
@Experimental
@InterfaceStability.Evolving
def select[U1, U2, U3, U4, U5](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4],
c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] =
selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, U5)]]
过滤
过滤,这里的过滤和sql里面的where条件是相同的,查询满足一定条件的记录。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53/**
* Filters rows using the given condition.
* {{{
* // The following are equivalent:
* peopleDs.filter($"age" > 15)
* peopleDs.where($"age" > 15)
* }}}
*
* @group typedrel
* @since 1.6.0
*/
def filter(condition: Column): Dataset[T] = withTypedPlan {
Filter(condition.expr, logicalPlan)
}
/**
* Filters rows using the given SQL expression.
* {{{
* peopleDs.filter("age > 15")
* }}}
*
* @group typedrel
* @since 1.6.0
*/
def filter(conditionExpr: String): Dataset[T] = {
filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr)))
}
/**
* Filters rows using the given condition. This is an alias for `filter`.
* {{{
* // The following are equivalent:
* peopleDs.filter($"age" > 15)
* peopleDs.where($"age" > 15)
* }}}
*
* @group typedrel
* @since 1.6.0
*/
def where(condition: Column): Dataset[T] = filter(condition)
/**
* Filters rows using the given SQL expression.
* {{{
* peopleDs.where("age > 15")
* }}}
*
* @group typedrel
* @since 1.6.0
*/
def where(conditionExpr: String): Dataset[T] = {
filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr)))
}
分组查询
1 | /** |
reduce操作
1 | /** |
数据钻取与聚合操作
1 | /** |
集合的交并补接口
1 | /** |
取样与切分
1 | /** |
去重
1 | /** |
统计指定的列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* Computes statistics for numeric and string columns, including count, mean, stddev, min, and
* max. If no columns are given, this function computes statistics for all numerical or string
* columns.
*
* This function is meant for exploratory data analysis, as we make no guarantee about the
* backward compatibility of the schema of the resulting Dataset. If you want to
* programmatically compute summary statistics, use the `agg` function instead.
*
* {{{
* ds.describe("age", "height").show()
*
* // output:
* // summary age height
* // count 10.0 10.0
* // mean 53.3 178.05
* // stddev 11.6 15.7
* // min 18.0 163.0
* // max 92.0 192.0
* }}}
*
* @group action
* @since 1.6.0
*/
获得指定列的描述性统计量
@scala.annotation.varargs
def describe(cols: String*): DataFrame = withPlan {
// The list of summary statistics to compute, in the form of expressions.
val statistics = List[(String, Expression => Expression)](
"count" -> ((child: Expression) => Count(child).toAggregateExpression()),
"mean" -> ((child: Expression) => Average(child).toAggregateExpression()),
"stddev" -> ((child: Expression) => StddevSamp(child).toAggregateExpression()),
"min" -> ((child: Expression) => Min(child).toAggregateExpression()),
"max" -> ((child: Expression) => Max(child).toAggregateExpression()))
val outputCols =
(if (cols.isEmpty) aggregatableColumns.map(usePrettyExpression(_).sql) else cols).toList
val ret: Seq[Row] = if (outputCols.nonEmpty) {
val aggExprs = statistics.flatMap { case (_, colToAgg) =>
outputCols.map(c => Column(Cast(colToAgg(Column(c).expr), StringType)).as(c))
}
val row = groupBy().agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
// Pivot the data so each summary is one row
row.grouped(outputCols.size).toSeq.zip(statistics).map { case (aggregation, (statistic, _)) =>
Row(statistic :: aggregation.toList: _*)
}
} else {
// If there are no output columns, just output a single column that contains the stats.
statistics.map { case (name, _) => Row(name) }
}
// All columns are string type
val schema = StructType(
StructField("summary", StringType) :: outputCols.map(StructField(_, StringType))).toAttributes
// `toArray` forces materialization to make the seq serializable
LocalRelation.fromExternalRows(schema, ret.toArray.toSeq)
}
/**
* Returns the first `n` rows.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @group action
* @since 1.6.0
*/
取得前n行数据
def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
/**
* Returns the first row.
* @group action
* @since 1.6.0
*/
def head(): T = head(1).head
/**
* Returns the first row. Alias for head().
* @group action
* @since 1.6.0
*/
def first(): T = head()
/**
* Concise syntax for chaining custom transformations.
* {{{
* def featurize(ds: Dataset[T]): Dataset[U] = ...
*
* ds
* .transform(featurize)
* .transform(...)
* }}}
*
* @group typedrel
* @since 1.6.0
*/
转换??
def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
/**
* :: Experimental ::
* (Scala-specific)
* Returns a new Dataset that only contains elements where `func` returns `true`.
*
* @group typedrel
* @since 1.6.0
*/
过滤
@Experimental
@InterfaceStability.Evolving
def filter(func: T => Boolean): Dataset[T] = {
withTypedPlan(TypedFilter(func, logicalPlan))
}
/**
* :: Experimental ::
* (Java-specific)
* Returns a new Dataset that only contains elements where `func` returns `true`.
*
* @group typedrel
* @since 1.6.0
*/
@Experimental
@InterfaceStability.Evolving
def filter(func: FilterFunction[T]): Dataset[T] = {
withTypedPlan(TypedFilter(func, logicalPlan))
}
数据的转换
1 | /** |
数据的提取与聚合
1 | /** |
分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Returns a new Dataset that has exactly `numPartitions` partitions.
*
* @group typedrel
* @since 1.6.0
*/
分区
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = true, logicalPlan)
}
/**
* Returns a new Dataset partitioned by the given partitioning expressions into
* `numPartitions`. The resulting Dataset is hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions)
}
/**
* Returns a new Dataset partitioned by the given partitioning expressions, using
* `spark.sql.shuffle.partitions` as number of partitions.
* The resulting Dataset is hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
RepartitionByExpression(
partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions)
}
/**
* Returns a new Dataset that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions. If a larger number of
* partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can call repartition. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @group typedrel
* @since 1.6.0
*/
def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = false, logicalPlan)
}
/**
* Returns a new Dataset that contains only the unique rows from this Dataset.
* This is an alias for `dropDuplicates`.
*
* @note Equality checking is performed directly on the encoded representation of the data
* and thus is not affected by a custom `equals` function defined on `T`.
*
* @group typedrel
* @since 2.0.0
*/
def distinct(): Dataset[T] = dropDuplicates()
数据的持久化
数据的持久化和缓存策略,一般我们操作rdd都是延迟计算,但是当我们多次重复使用一个rdd的时候可以选择将其缓存而不是每次进行一个计算,可以提高效率。
1 | /** |
注册临时表
通过注册可以将一个dataset直接当作一个表来操作,这样就可以直接通过sql来执行了,不过返回的结果又是一个dataset1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86/**
* Registers this Dataset as a temporary table using the given name. The lifetime of this
* temporary table is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 1.6.0
*/
注册
@deprecated("Use createOrReplaceTempView(viewName) instead.", "2.0.0")
def registerTempTable(tableName: String): Unit = {
createOrReplaceTempView(tableName)
}
/**
* Creates a local temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
* Local temporary view is session-scoped. Its lifetime is the lifetime of the session that
* created it, i.e. it will be automatically dropped when the session terminates. It's not
* tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
*
* @throws AnalysisException if the view name is invalid or already exists
*
* @group basic
* @since 2.0.0
*/
创建表
@throws[AnalysisException]
def createTempView(viewName: String): Unit = withPlan {
createTempViewCommand(viewName, replace = false, global = false)
}
/**
* Creates a local temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = withPlan {
createTempViewCommand(viewName, replace = true, global = false)
}
/**
* Creates a global temporary view using the given name. The lifetime of this
* temporary view is tied to this Spark application.
*
* Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application,
* i.e. it will be automatically dropped when the application terminates. It's tied to a system
* preserved database `global_temp`, and we must use the qualified name to refer a global temp
* view, e.g. `SELECT * FROM global_temp.view1`.
*
* @throws AnalysisException if the view name is invalid or already exists
*
* @group basic
* @since 2.1.0
*/
@throws[AnalysisException]
def createGlobalTempView(viewName: String): Unit = withPlan {
createTempViewCommand(viewName, replace = false, global = true)
}
private def createTempViewCommand(
viewName: String,
replace: Boolean,
global: Boolean): CreateViewCommand = {
val viewType = if (global) GlobalTempView else LocalTempView
val tableIdentifier = try {
sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName)
} catch {
case _: ParseException => throw new AnalysisException(s"Invalid view name: $viewName")
}
CreateViewCommand(
name = tableIdentifier,
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
allowExisting = false,
replace = replace,
viewType = viewType)
}
数据保存
数据保存有一个专门的write类来处理,这里就是调用write方法返回一个write对象来实现的
1 | /** |
转换成json格式
1 | /** |
获取文件列表
可以获取当前dataSet都加载了那些文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* Returns a best-effort snapshot of the files that compose this Dataset. This method simply
* asks each constituent BaseRelation for its respective files and takes the union of all results.
* Depending on the source relations, this may not find all input files. Duplicates are removed.
*
* @group basic
* @since 2.0.0
*/
def inputFiles: Array[String] = {
val files: Seq[String] = queryExecution.optimizedPlan.collect {
case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
}.flatten
files.toSet.toArray
}