作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Zeppelin 运行 Scala Spark 作业。
当我运行它时,我收到以下错误:
latestForEachKey: org.apache.spark.sql.DataFrame = [PartitionStatement_1: string, PartitionYear_1: string ... 64 more fields]
<console>:393: error: Could not write class $$$$2e6199f161363585e7ae9b28bcf8535e$$$$iw because it exceeds JVM code size limits. Method <init>'s code too large!
class $iw extends Serializable {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import java.io.File
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val getPartition = spark.udf.register("getPartition", (filePath: String) => filePath.split("\\.")(3))
val getYearAndStatementTypeCodePartition = spark.udf.register("getPartition", (filePath: String) => filePath.split("\\.")(4))
val get_partition_Year = spark.udf.register("get_partition_Year", (df1resultFinal: String) => df1resultFinal.split("-")(0))
val get_partition_Statement = spark.udf.register("get_partition_Year", (df1resultFinal: String) => df1resultFinal.split("-")(1))
val rdd = sc.textFile("s3://trfsmallfffile/FinancialStatementLineItem/MAIN")
val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)
val df1resultFinal=data.withColumn("DataPartition", getPartition(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_partition_Year(getYearAndStatementTypeCodePartition(input_file_name)))
val df1resultFinalWithAllPartition=df1resultFinalWithYear.withColumn("PartitionStatement", get_partition_Statement(getYearAndStatementTypeCodePartition(input_file_name)))
val df1resultFinalwithTimestamp=df1resultFinalWithAllPartition
.withColumn("CapitalChangeAdjustmentDate",date_format(col("CapitalChangeAdjustmentDate"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("FinancialStatementLineItemValue", regexp_replace(format_number($"FinancialStatementLineItemValue".cast(DoubleType), 5), ",", ""))
.withColumn("AdjustedForCorporateActionValue", regexp_replace(format_number($"AdjustedForCorporateActionValue".cast(DoubleType), 5), ",", ""))
.withColumn("IsAsReportedCurrencySetManually", regexp_replace(format_number($"IsAsReportedCurrencySetManually".cast(DoubleType), 5), ",", ""))
.withColumn("ItemDisplayedValue", regexp_replace(format_number($"ItemDisplayedValue".cast(DoubleType), 5), ",", ""))
.withColumn("ReportedValue", regexp_replace(format_number($"ReportedValue".cast(DoubleType), 5), ",", ""))
.withColumn("AsReportedExchangeRate", regexp_replace(format_number($"AsReportedExchangeRate".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange".cast(DoubleType), 5), ",", ""))
//Loading Incremental
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialStatementLineItem/INCR")
val header1 = rdd1.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)
val df2resultTimestamp=data1
.withColumn("CapitalChangeAdjustmentDate_1",date_format(col("CapitalChangeAdjustmentDate_1"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("FinancialStatementLineItemValue_1", regexp_replace(format_number($"FinancialStatementLineItemValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("AdjustedForCorporateActionValue_1", regexp_replace(format_number($"AdjustedForCorporateActionValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("IsAsReportedCurrencySetManually_1", regexp_replace(format_number($"IsAsReportedCurrencySetManually_1".cast(DoubleType), 5), ",", ""))
.withColumn("ItemDisplayedValue_1", regexp_replace(format_number($"ItemDisplayedValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("ReportedValue_1", regexp_replace(format_number($"ReportedValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("AsReportedExchangeRate_1", regexp_replace(format_number($"AsReportedExchangeRate_1".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange_1", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange_1".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange_1", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange_1".cast(DoubleType), 5), ",", ""))
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FinancialStatementLineItem_lineItemId", "PeriodId","SourceId","StatementTypeCode","StatementCurrencyId","uniqueFundamentalSet").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2resultTimestamp.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinalwithTimestamp.join(latestForEachKey, Seq("FinancialStatementLineItem_lineItemId", "PeriodId","SourceId","StatementTypeCode","StatementCurrencyId","uniqueFundamentalSet"), "outer")
.select($"uniqueFundamentalSet",$"PeriodId",$"SourceId",$"StatementTypeCode",$"StatementCurrencyId",$"FinancialStatementLineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear").as("PartitionYear"),
when($"PartitionStatement_1".isNotNull, $"PartitionStatement_1").otherwise($"PartitionStatement").as("PartitionStatement"),
when($"FinancialAsReportedLineItemName_1".isNotNull, $"FinancialAsReportedLineItemName_1").otherwise($"FinancialAsReportedLineItemName").as("FinancialAsReportedLineItemName"),
when($"FinancialAsReportedLineItemName_languageId_1".isNotNull, $"FinancialAsReportedLineItemName_languageId_1").otherwise($"FinancialAsReportedLineItemName_languageId").as("FinancialAsReportedLineItemName_languageId"),
when($"FinancialStatementLineItemValue_1".isNotNull, $"FinancialStatementLineItemValue_1").otherwise($"FinancialStatementLineItemValue").as("FinancialStatementLineItemValue"),
when($"AdjustedForCorporateActionValue_1".isNotNull, $"AdjustedForCorporateActionValue_1").otherwise($"AdjustedForCorporateActionValue").as("AdjustedForCorporateActionValue"),
when($"ReportedCurrencyId_1".isNotNull, $"ReportedCurrencyId_1").otherwise($"ReportedCurrencyId").as("ReportedCurrencyId"),
when($"IsAsReportedCurrencySetManually_1".isNotNull, $"IsAsReportedCurrencySetManually_1").otherwise($"IsAsReportedCurrencySetManually").as("IsAsReportedCurrencySetManually"),
when($"Unit_1".isNotNull, $"Unit_1").otherwise($"Unit").as("Unit"),
when($"IsTotal_1".isNotNull, $"IsTotal_1").otherwise($"IsTotal").as("IsTotal"),
when($"StatementSectionCode_1".isNotNull, $"StatementSectionCode_1").otherwise($"StatementSectionCode").as("StatementSectionCode"),
when($"DimentionalLineItemId_1".isNotNull, $"DimentionalLineItemId_1").otherwise($"DimentionalLineItemId").as("DimentionalLineItemId"),
when($"IsDerived_1".isNotNull, $"IsDerived_1").otherwise($"IsDerived").as("IsDerived"),
when($"EstimateMethodCode_1".isNotNull, $"EstimateMethodCode_1").otherwise($"EstimateMethodCode").as("EstimateMethodCode"),
when($"EstimateMethodNote_1".isNotNull, $"EstimateMethodNote_1").otherwise($"EstimateMethodNote").as("EstimateMethodNote"),
when($"EstimateMethodNote_languageId_1".isNotNull, $"EstimateMethodNote_languageId_1").otherwise($"EstimateMethodNote_languageId").as("EstimateMethodNote_languageId"),
when($"FinancialLineItemSource_1".isNotNull, $"FinancialLineItemSource_1").otherwise($"FinancialLineItemSource").as("FinancialLineItemSource"),
when($"IsCombinedItem_1".isNotNull, $"IsCombinedItem_1").otherwise($"IsCombinedItem").as("IsCombinedItem"),
when($"IsExcludedFromStandardization_1".isNotNull, $"IsExcludedFromStandardization_1").otherwise($"IsExcludedFromStandardization").as("IsExcludedFromStandardization"),
when($"DocByteOffset_1".isNotNull, $"DocByteOffset_1").otherwise($"DocByteOffset").as("DocByteOffset"),
when($"DocByteLength_1".isNotNull, $"DocByteLength_1").otherwise($"DocByteLength").as("DocByteLength"),
when($"BookMark_1".isNotNull, $"BookMark_1").otherwise($"BookMark").as("BookMark"),
when($"ItemDisplayedNegativeFlag_1".isNotNull, $"ItemDisplayedNegativeFlag_1").otherwise($"ItemDisplayedNegativeFlag").as("ItemDisplayedNegativeFlag"),
when($"ItemScalingFactor_1".isNotNull, $"ItemScalingFactor_1").otherwise($"ItemScalingFactor").as("ItemScalingFactor"),
when($"ItemDisplayedValue_1".isNotNull, $"ItemDisplayedValue_1").otherwise($"ItemDisplayedValue").as("ItemDisplayedValue"),
when($"ReportedValue_1".isNotNull, $"ReportedValue_1").otherwise($"ReportedValue").as("ReportedValue"),
when($"EditedDescription_1".isNotNull, $"EditedDescription_1").otherwise($"EditedDescription").as("EditedDescription"),
when($"EditedDescription_languageId_1".isNotNull, $"EditedDescription_languageId_1").otherwise($"EditedDescription_languageId").as("EditedDescription_languageId"),
when($"ReportedDescription_1".isNotNull, $"ReportedDescription_1").otherwise($"ReportedDescription").as("ReportedDescription"),
when($"ReportedDescription_languageId_1".isNotNull, $"ReportedDescription_languageId_1").otherwise($"ReportedDescription_languageId").as("ReportedDescription_languageId"),
when($"AsReportedInstanceSequence_1".isNotNull, $"AsReportedInstanceSequence_1").otherwise($"AsReportedInstanceSequence").as("AsReportedInstanceSequence"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FinancialStatementLineItemSequence_1".isNotNull, $"FinancialStatementLineItemSequence_1").otherwise($"FinancialStatementLineItemSequence").as("FinancialStatementLineItemSequence"),
when($"SystemDerivedTypeCode_1".isNotNull, $"SystemDerivedTypeCode_1").otherwise($"SystemDerivedTypeCode").as("SystemDerivedTypeCode"),
when($"AsReportedExchangeRate_1".isNotNull, $"AsReportedExchangeRate_1").otherwise($"AsReportedExchangeRate").as("AsReportedExchangeRate"),
when($"AsReportedExchangeRateSourceCurrencyId_1".isNotNull, $"AsReportedExchangeRateSourceCurrencyId_1").otherwise($"AsReportedExchangeRateSourceCurrencyId").as("AsReportedExchangeRateSourceCurrencyId"),
when($"ThirdPartySourceCode_1".isNotNull, $"ThirdPartySourceCode_1").otherwise($"ThirdPartySourceCode").as("ThirdPartySourceCode"),
when($"FinancialStatementLineItemValueUpperRange_1".isNotNull, $"FinancialStatementLineItemValueUpperRange_1").otherwise($"FinancialStatementLineItemValueUpperRange").as("FinancialStatementLineItemValueUpperRange"),
when($"FinancialStatementLineItemLocalLanguageLabel_1".isNotNull, $"FinancialStatementLineItemLocalLanguageLabel_1").otherwise($"FinancialStatementLineItemLocalLanguageLabel").as("FinancialStatementLineItemLocalLanguageLabel"),
when($"FinancialStatementLineItemLocalLanguageLabel_languageId_1".isNotNull, $"FinancialStatementLineItemLocalLanguageLabel_languageId_1").otherwise($"FinancialStatementLineItemLocalLanguageLabel_languageId").as("FinancialStatementLineItemLocalLanguageLabel_languageId"),
when($"IsFinal_1".isNotNull, $"IsFinal_1").otherwise($"IsFinal").as("IsFinal"),
when($"FinancialStatementLineItem_lineItemInstanceKey_1".isNotNull, $"FinancialStatementLineItem_lineItemInstanceKey_1").otherwise($"FinancialStatementLineItem_lineItemInstanceKey").as("FinancialStatementLineItem_lineItemInstanceKey"),
when($"StatementSectionIsCredit_1".isNotNull, $"StatementSectionIsCredit_1").otherwise($"StatementSectionIsCredit").as("StatementSectionIsCredit"),
when($"CapitalChangeAdjustmentDate_1".isNotNull, $"CapitalChangeAdjustmentDate_1").otherwise($"CapitalChangeAdjustmentDate").as("CapitalChangeAdjustmentDate"),
when($"ParentLineItemId_1".isNotNull, $"ParentLineItemId_1").otherwise($"ParentLineItemId").as("ParentLineItemId"),
when($"EstimateMethodId_1".isNotNull, $"EstimateMethodId_1").otherwise($"EstimateMethodId").as("EstimateMethodId"),
when($"StatementSectionId_1".isNotNull, $"StatementSectionId_1").otherwise($"StatementSectionId").as("StatementSectionId"),
when($"SystemDerivedTypeCodeId_1".isNotNull, $"SystemDerivedTypeCodeId_1").otherwise($"SystemDerivedTypeCodeId").as("SystemDerivedTypeCodeId"),
when($"UnitEnumerationId_1".isNotNull, $"UnitEnumerationId_1").otherwise($"UnitEnumerationId").as("UnitEnumerationId"),
when($"FiscalYear_1".isNotNull, $"FiscalYear_1").otherwise($"FiscalYear").as("FiscalYear"),
when($"IsAnnual_1".isNotNull, $"IsAnnual_1").otherwise($"IsAnnual").as("IsAnnual"),
when($"PeriodPermId_1".isNotNull, $"PeriodPermId_1").otherwise($"PeriodPermId").as("PeriodPermId"),
when($"PeriodPermId_objectTypeId_1".isNotNull, $"PeriodPermId_objectTypeId_1").otherwise($"PeriodPermId_objectTypeId").as("PeriodPermId_objectTypeId"),
when($"PeriodPermId_objectType_1".isNotNull, $"PeriodPermId_objectType_1").otherwise($"PeriodPermId_objectType").as("PeriodPermId_objectType"),
when($"AuditID_1".isNotNull, $"AuditID_1").otherwise($"AuditID").as("AuditID"),
when($"AsReportedItemId_1".isNotNull, $"AsReportedItemId_1").otherwise($"AsReportedItemId").as("AsReportedItemId"),
when($"ExpressionInstanceId_1".isNotNull, $"ExpressionInstanceId_1").otherwise($"ExpressionInstanceId").as("ExpressionInstanceId"),
when($"ExpressionText_1".isNotNull, $"ExpressionText_1").otherwise($"ExpressionText").as("ExpressionText"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
.filter(!$"FFAction|!|".contains("D|!|"))
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear","PartitionStatement")
.format("csv")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsmallfffile/FinancialStatementLineItem/output")
val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear","PartitionStatement").count
FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
.option("rootTag", "FFFileType")
.option("rowTag", "FFPhysicalFile")
.save("s3://trffullfiles/FinancialStatementLineItem/Descr")
最佳答案
Java(以及扩展 Scala)中方法的最大大小为 64KB 字节码,参见例如问题here .这意味着您拥有太多代码而没有将其拆分为多个方法。
在你的情况下,我会推荐以下内容:
dfMainOutput
时有很多的 when
声明,应该可以以更有效和更好看的方式做到这一点。 loadData()
的方法。它读取返回数据帧的数据库和另一种合并 df1resultFinalwithTimestamp
的方法与 latestForEachKey
.您可以为代码的每个部分/部分创建方法。 关于scala - 错误 : Could not write class iw because it exceeds JVM code size limits. 方法代码太大,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48162243/
我是一名优秀的程序员,十分优秀!