@@ -18,15 +18,225 @@ package org.apache.spark.sql.execution.datasources.parquet
1818
1919import org .apache .gluten .config .GlutenConfig
2020
21- import org .apache .spark .SparkConf
22- import org .apache .spark .sql .GlutenSQLTestsTrait
21+ import org .apache .spark .{SparkConf , SparkException }
22+ import org .apache .spark .sql .{DataFrame , GlutenSQLTestsTrait }
23+ import org .apache .spark .sql .execution .datasources .SchemaColumnConvertNotSupportedException
24+ import org .apache .spark .sql .functions .col
25+ import org .apache .spark .sql .internal .{LegacyBehaviorPolicy , SQLConf }
26+ import org .apache .spark .sql .types ._
27+ import org .apache .spark .sql .types .DecimalType .{ByteDecimal , IntDecimal , LongDecimal , ShortDecimal }
28+
29+ import org .apache .hadoop .fs .Path
30+ import org .apache .parquet .column .{Encoding , ParquetProperties }
31+ import org .apache .parquet .format .converter .ParquetMetadataConverter
32+ import org .apache .parquet .hadoop .{ParquetFileReader , ParquetOutputFormat }
33+
34+ import java .io .File
2335
2436class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {
2537
38+ import testImplicits ._
39+
2640 // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer.
2741 // This suite tests the READ path. The native writer doesn't produce
2842 // DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent test's
2943 // V2 encoding assertions expect.
3044 override def sparkConf : SparkConf =
3145 super .sparkConf.set(GlutenConfig .NATIVE_WRITER_ENABLED .key, " false" )
46+
47+ // ====== Private methods copied from ParquetTypeWideningSuite ======
48+ // These are private in the parent class, so we must copy them to use in overridden tests.
49+ // The key change: removed withAllParquetReaders wrapper since Velox native reader
50+ // always behaves like the vectorized reader.
51+
52+ private def checkAllParquetReaders (
53+ values : Seq [String ],
54+ fromType : DataType ,
55+ toType : DataType ,
56+ expectError : Boolean ): Unit = {
57+ val timestampRebaseModes = toType match {
58+ case _ : TimestampNTZType | _ : DateType =>
59+ Seq (LegacyBehaviorPolicy .CORRECTED , LegacyBehaviorPolicy .LEGACY )
60+ case _ =>
61+ Seq (LegacyBehaviorPolicy .CORRECTED )
62+ }
63+ for {
64+ dictionaryEnabled <- Seq (true , false )
65+ timestampRebaseMode <- timestampRebaseModes
66+ }
67+ withClue(
68+ s " with dictionary encoding ' $dictionaryEnabled' with timestamp rebase mode " +
69+ s " ' $timestampRebaseMode'' " ) {
70+ withAllParquetWriters {
71+ withTempDir {
72+ dir =>
73+ val expected =
74+ writeParquetFiles(dir, values, fromType, dictionaryEnabled, timestampRebaseMode)
75+ if (expectError) {
76+ val exception = intercept[SparkException ] {
77+ readParquetFiles(dir, toType).collect()
78+ }
79+ assert(
80+ exception.getCause
81+ .isInstanceOf [SchemaColumnConvertNotSupportedException ] ||
82+ exception.getCause
83+ .isInstanceOf [org.apache.parquet.io.ParquetDecodingException ] ||
84+ exception.getCause.getMessage.contains(" PARQUET_CONVERSION_FAILURE" ))
85+ } else {
86+ checkAnswer(readParquetFiles(dir, toType), expected.select($" a" .cast(toType)))
87+ }
88+ }
89+ }
90+ }
91+ }
92+
93+ private def readParquetFiles (dir : File , dataType : DataType ): DataFrame = {
94+ spark.read.schema(s " a ${dataType.sql}" ).parquet(dir.getAbsolutePath)
95+ }
96+
97+ private def writeParquetFiles (
98+ dir : File ,
99+ values : Seq [String ],
100+ dataType : DataType ,
101+ dictionaryEnabled : Boolean ,
102+ timestampRebaseMode : LegacyBehaviorPolicy .Value = LegacyBehaviorPolicy .CORRECTED )
103+ : DataFrame = {
104+ val repeatedValues = List .fill(if (dictionaryEnabled) 10 else 1 )(values).flatten
105+ val df = repeatedValues.toDF(" a" ).select(col(" a" ).cast(dataType))
106+ withSQLConf(
107+ ParquetOutputFormat .ENABLE_DICTIONARY -> dictionaryEnabled.toString,
108+ SQLConf .PARQUET_REBASE_MODE_IN_WRITE .key -> timestampRebaseMode.toString) {
109+ df.write.mode(" overwrite" ).parquet(dir.getAbsolutePath)
110+ }
111+
112+ if (dictionaryEnabled && ! DecimalType .isByteArrayDecimalType(dataType)) {
113+ assertAllParquetFilesDictionaryEncoded(dir)
114+ }
115+
116+ val isParquetV2 = spark.conf
117+ .getOption(ParquetOutputFormat .WRITER_VERSION )
118+ .contains(ParquetProperties .WriterVersion .PARQUET_2_0 .toString)
119+ if (isParquetV2) {
120+ if (dictionaryEnabled) {
121+ assertParquetV2Encoding(dir, Encoding .PLAIN )
122+ } else if (DecimalType .is64BitDecimalType(dataType)) {
123+ assertParquetV2Encoding(dir, Encoding .DELTA_BINARY_PACKED )
124+ } else if (DecimalType .isByteArrayDecimalType(dataType)) {
125+ assertParquetV2Encoding(dir, Encoding .DELTA_BYTE_ARRAY )
126+ }
127+ }
128+ df
129+ }
130+
131+ private def assertAllParquetFilesDictionaryEncoded (dir : File ): Unit = {
132+ dir.listFiles(_.getName.endsWith(" .parquet" )).foreach {
133+ file =>
134+ val parquetMetadata = ParquetFileReader .readFooter(
135+ spark.sessionState.newHadoopConf(),
136+ new Path (dir.toString, file.getName),
137+ ParquetMetadataConverter .NO_FILTER )
138+ parquetMetadata.getBlocks.forEach {
139+ block =>
140+ block.getColumns.forEach {
141+ col =>
142+ assert(
143+ col.hasDictionaryPage,
144+ " This test covers dictionary encoding but column " +
145+ s " ' ${col.getPath.toDotString}' in the test data is not dictionary encoded. " )
146+ }
147+ }
148+ }
149+ }
150+
151+ private def assertParquetV2Encoding (dir : File , expected_encoding : Encoding ): Unit = {
152+ dir.listFiles(_.getName.endsWith(" .parquet" )).foreach {
153+ file =>
154+ val parquetMetadata = ParquetFileReader .readFooter(
155+ spark.sessionState.newHadoopConf(),
156+ new Path (dir.toString, file.getName),
157+ ParquetMetadataConverter .NO_FILTER )
158+ parquetMetadata.getBlocks.forEach {
159+ block =>
160+ block.getColumns.forEach {
161+ col =>
162+ assert(
163+ col.getEncodings.contains(expected_encoding),
164+ s " Expected column ' ${col.getPath.toDotString}' " +
165+ s " to use encoding $expected_encoding " +
166+ s " but found ${col.getEncodings}. "
167+ )
168+ }
169+ }
170+ }
171+ }
172+
173+ // ====== Override tests ======
174+ // Velox native reader always behaves like Spark's vectorized reader (no parquet-mr fallback).
175+ // In the parent tests, `expectError` is conditional on PARQUET_VECTORIZED_READER_ENABLED:
176+ // parquet-mr allows conversions that the vectorized reader rejects.
177+ // Since Velox always rejects, we override with expectError = true.
178+
179+ for {
180+ (values : Seq [String ], fromType : DataType , toType : DecimalType ) <- Seq (
181+ (Seq (" 1" , " 2" ), ByteType , DecimalType (1 , 0 )),
182+ (Seq (" 1" , " 2" ), ByteType , ByteDecimal ),
183+ (Seq (" 1" , " 2" ), ShortType , ByteDecimal ),
184+ (Seq (" 1" , " 2" ), ShortType , ShortDecimal ),
185+ (Seq (" 1" , " 2" ), IntegerType , ShortDecimal ),
186+ (Seq (" 1" , " 2" ), ByteType , DecimalType (ByteDecimal .precision + 1 , 1 )),
187+ (Seq (" 1" , " 2" ), ShortType , DecimalType (ShortDecimal .precision + 1 , 1 )),
188+ (Seq (" 1" , " 2" ), LongType , IntDecimal ),
189+ (Seq (" 1" , " 2" ), ByteType , DecimalType (ByteDecimal .precision - 1 , 0 )),
190+ (Seq (" 1" , " 2" ), ShortType , DecimalType (ShortDecimal .precision - 1 , 0 )),
191+ (Seq (" 1" , " 2" ), IntegerType , DecimalType (IntDecimal .precision - 1 , 0 )),
192+ (Seq (" 1" , " 2" ), LongType , DecimalType (LongDecimal .precision - 1 , 0 )),
193+ (Seq (" 1" , " 2" ), ByteType , DecimalType (ByteDecimal .precision, 1 )),
194+ (Seq (" 1" , " 2" ), ShortType , DecimalType (ShortDecimal .precision, 1 )),
195+ (Seq (" 1" , " 2" ), IntegerType , DecimalType (IntDecimal .precision, 1 )),
196+ (Seq (" 1" , " 2" ), LongType , DecimalType (LongDecimal .precision, 1 ))
197+ )
198+ }
199+ testGluten(s " unsupported parquet conversion $fromType -> $toType" ) {
200+ checkAllParquetReaders(values, fromType, toType, expectError = true )
201+ }
202+
203+ for {
204+ (fromPrecision, toPrecision) <-
205+ Seq (7 -> 5 , 10 -> 5 , 20 -> 5 , 12 -> 10 , 20 -> 10 , 22 -> 20 )
206+ }
207+ testGluten(
208+ s " parquet decimal precision change Decimal( $fromPrecision, 2) -> Decimal( $toPrecision, 2) " ) {
209+ checkAllParquetReaders(
210+ values = Seq (" 1.23" , " 10.34" ),
211+ fromType = DecimalType (fromPrecision, 2 ),
212+ toType = DecimalType (toPrecision, 2 ),
213+ expectError = true )
214+ }
215+
216+ for {
217+ ((fromPrecision, fromScale), (toPrecision, toScale)) <-
218+ // Narrowing precision and scale by the same amount.
219+ Seq (
220+ (7 , 4 ) -> (5 , 2 ),
221+ (10 , 7 ) -> (5 , 2 ),
222+ (20 , 17 ) -> (5 , 2 ),
223+ (12 , 4 ) -> (10 , 2 ),
224+ (20 , 17 ) -> (10 , 2 ),
225+ (22 , 4 ) -> (20 , 2 )) ++
226+ // Increasing precision and decreasing scale.
227+ Seq ((10 , 6 ) -> (12 , 4 )) ++
228+ // Decreasing precision and increasing scale.
229+ Seq ((12 , 4 ) -> (10 , 6 ), (22 , 5 ) -> (20 , 7 )) ++
230+ // Increasing precision by a smaller amount than scale.
231+ Seq ((5 , 2 ) -> (6 , 4 ), (10 , 4 ) -> (12 , 7 ))
232+ }
233+ testGluten(
234+ s " parquet decimal precision and scale change Decimal( $fromPrecision, $fromScale) -> " +
235+ s " Decimal( $toPrecision, $toScale) " ) {
236+ checkAllParquetReaders(
237+ values = Seq (" 1.23" , " 10.34" ),
238+ fromType = DecimalType (fromPrecision, fromScale),
239+ toType = DecimalType (toPrecision, toScale),
240+ expectError = true )
241+ }
32242}
0 commit comments