gpt4 book ai didi

java - CSVRecordReader 和 CSV 行末尾未终止的引用字段

转载 作者:太空宇宙 更新时间:2023-11-04 09:07:37 25 4
gpt4 key购买 nike

我使用的数据集有问题。它们是包含假新闻的 CSV。我的问题是 CSVRecordReader 类,这是 DataVec (Deeplearning4j) 为我提供的。我正在尝试进行一个 Spark 转变过程。我的问题是众所周知的“CSV 行末尾未终止的引用字段”错误。

在互联网上搜索时,每个人都建议您查找发生这种情况的行并修复 csv 中的问题,但这将非常困难,因为数据集包含文章的部分内容(可能是正确的也可能是错误的)。这些文章在引文中包含许多引言,以及其他文章的典型内容。

在寻找解决方案时,我最终使用 Univocity csv 解析器库实现了自己的 CSVRecordReader,该库非常灵活,解决了当前 CSVRecordReader 所存在的所有问题,但现在我发现了另一个困境,那就是该库的解析器没有实现接口(interface) 可序列化 并且在 Apache Spark 中运行转换会引发异常

org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: com.univocity.parsers.csv.CsvParser

如何解决我的问题?

enter image description here

我自己的 CSVRecordReader 代码

package cu.desoft.cav.RecordReader;

import com.univocity.parsers.common.IterableResult;
import com.univocity.parsers.common.ParsingContext;
import com.univocity.parsers.common.ResultIterator;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import org.datavec.api.records.Record;
import org.datavec.api.records.metadata.RecordMetaData;
import org.datavec.api.records.metadata.RecordMetaDataLine;
import org.datavec.api.records.reader.impl.LineRecordReader;
import org.datavec.api.split.FileSplit;
import org.datavec.api.split.InputSplit;
import org.datavec.api.writable.Text;
import org.datavec.api.writable.Writable;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* @author: Acosta email: yunielacost738@gmail.com
* created at: 11/25/2019
*/

public class UltraCSVRecordReader extends LineRecordReader {
public static final char DEFAULT_DELIMITER = ',';
public static final char DEFAULT_QUOTE = '"';
public static final char DEFAULT_QUOTE_ESCAPE = '"';
public static final char DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING = '\0';
private CsvParser csvParser;
private CsvParserSettings settings;
private ResultIterator<String[], ParsingContext> iterator;
public UltraCSVRecordReader() {
this(0, DEFAULT_DELIMITER, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
* @param unknownFormat if you can't know line endings, column delimiters and quotation characters set unknownFormat=true
* for automatic detection
*/
public UltraCSVRecordReader(boolean unknownFormat) {
this();
if (unknownFormat) {
settings = new CsvParserSettings();
settings.detectFormatAutomatically();
csvParser = new CsvParser(settings);
}
}

public UltraCSVRecordReader(CsvParserSettings settings) {
this.settings = settings;
csvParser = new CsvParser(settings);
}

/**
* @param skipNumLines number of lines to skip
* @param delimiter (default ,): value used to separate individual fields in the input
* @param quote (default "): value used for escaping values where the fields delimiter is part of
* the value (e.g. the value "a,b" is parse as a , b).
* @param quoteEscape (default "): value used for escaping the quote character inside an already escaped value
* (e.g. the value " "" a,b "" " is parse as " a , b ").
* @param charToEscapeQuoteEscaping (default \0): value used for escaping the quote escape character, when quote and quote escape are different
* (e.g. the value “\ " a , b " \” is parsed as \ " a , b " \, if quote = ", quoteEscape = \ and charToEscapeQuoteEscaping = \).
*/
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote, char quoteEscape,
char charToEscapeQuoteEscaping) {
settings = new CsvParserSettings();
settings.getFormat().setDelimiter(delimiter);
settings.getFormat().setQuote(quote);
settings.getFormat().setQuoteEscape(quoteEscape);
settings.getFormat().setCharToEscapeQuoteEscaping(charToEscapeQuoteEscaping);
settings.setNumberOfRowsToSkip(skipNumLines);
csvParser = new CsvParser(settings);
}

/**
* @param skipNumLines number of lines to skip
*/
public UltraCSVRecordReader(long skipNumLines) {
this(skipNumLines, DEFAULT_DELIMITER, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
* @param skipNumLines number of lines to skip
* @param delimiter (default ,): value used to separate individual fields in the input
*/
public UltraCSVRecordReader(long skipNumLines, char delimiter) {
this(skipNumLines, delimiter, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
* @param skipNumLines number of lines to skip
* @param delimiter (default ,): value used to separate individual fields in the input
* @param quote (default "): value used for escaping values where the fields delimiter is part of
* the value (e.g. the value "a,b" is parse as a , b).
*/
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote) {
this(skipNumLines, delimiter, quote, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
* @param skipNumLines number of lines to skip
* @param delimiter (default ,): value used to separate individual fields in the input
* @param quote (default "): value used for escaping values where the fields delimiter is part of
* the value (e.g. the value "a,b" is parse as a , b).
* @param quoteEscape (default "): value used for escaping the quote character inside an already escaped value
* (e.g. the value " "" a,b "" " is parse as " a , b ").
*/
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote, char quoteEscape) {
this(skipNumLines, delimiter, quote, quoteEscape, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

@Override
public void initialize(InputSplit split) throws IOException, InterruptedException {
super.initialize(split);
this.initialize(((FileSplit) split).getRootDir());
}

public UltraCSVRecordReader maxLengthCharactersToParser(int numberCharacters) {
this.settings.setMaxCharsPerColumn(numberCharacters);
this.csvParser = new CsvParser(this.settings);
return this;
}

public void initialize(File file) {
IterableResult<String[], ParsingContext> iterate = this.csvParser.iterate(file);
iterator = iterate.iterator();
}

protected List<Writable> parseLine(String line) {
String[] split;
split = this.csvParser.parseLine(line);
List<Writable> values = new ArrayList<>();
for (String value : split) {
values.add(new Text(value));
}
return values;
}

public List<List<Writable>> next(int num) {
List<List<Writable>> ret = new ArrayList<>(Math.min(num, 10000));
int count = 0;

while (this.hasNext() && count++ < num) {
ret.add(this.next());
}
return ret;
}

public List<Writable> next() {
String[] valuesSplit = iterator.next();
List<Writable> values = new ArrayList<>();
try {
for (String value : valuesSplit) {
values.add(new Text(value));
}
} catch (NullPointerException ex) {
ex.printStackTrace();
System.out.println(values);
System.out.println("================================");
System.out.println(Arrays.toString(valuesSplit));
}

return values;
}

public boolean batchesSupported() {
return true;
}

public boolean hasNext() {
return iterator.hasNext();
}

public Record nextRecord() {
List<Writable> next = this.next();
URI uri = this.locations != null && this.locations.length >= 1 ? this.locations[this.splitIndex] : null;
RecordMetaData meta = new RecordMetaDataLine(this.lineIndex - 1, uri, UltraCSVRecordReader.class);
return new org.datavec.api.records.impl.Record(next, meta);
}

public Record loadFromMetaData(RecordMetaData recordMetaData) throws IOException {
return this.loadFromMetaData(Collections.singletonList(recordMetaData)).get(0);
}

public List<Record> loadFromMetaData(List<RecordMetaData> recordMetaDatas) throws IOException {
List<Record> list = super.loadFromMetaData(recordMetaDatas);

for (Record r : list) {
String line = r.getRecord().get(0).toString();
r.setRecord(this.parseLine(line));
}

return list;
}

public void reset() {
super.reset();
}

public CsvParser getCsvParser() {
return csvParser;
}
}

示例数据集

"uuid","ord_in_thread","author","published","title","text","language","crawled","site_url","country","domain_rank","thread_title","spam_score","main_img_url","replies_count","participants_count","likes","comments","shares","type" "6a175f46bcd24d39b3e962ad0f29936721db70db",0,"Barracuda Brigade","2016-10-26T21:41:00.000+03:00","Muslims BUSTED: They Stole Millions In Gov’t Benefits","Print They should pay all the back all the money plus interest. The entire family and everyone who came in with them need to be deported asap. Why did it take two years to bust them? Here we go again …another group stealing from the government and taxpayers! A group of Somalis stole over four million in government benefits over just 10 months! We’ve reported on numerous cases like this one where the Muslim refugees/immigrants commit fraud by scamming our system…It’s way out of control! More Related","english","2016-10-27T01:49:27.168+03:00","100percentfedup.com","US",25689,"Muslims BUSTED: They Stole Millions In Gov’t Benefits",0,"http://bb4sp.com/wp-content/uploads/2016/10/Fullscreen-capture-10262016-83501-AM.bmp.jpg",0,1,0,0,0,"bias"

这是我的转换过程

package cu.desoft.cav.preprocessing;

import cu.desoft.cav.RecordReader.UltraCSVRecordReader;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.datavec.api.records.reader.RecordReader;
import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
import org.datavec.api.transform.TransformProcess;
import org.datavec.api.transform.schema.Schema;
import org.datavec.api.writable.Writable;
import org.datavec.spark.transform.SparkTransformExecutor;
import org.datavec.spark.transform.misc.StringToWritablesFunction;
import org.datavec.spark.transform.misc.WritablesToStringFunction;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
* author: acosta
* email: yunielacosta738@gmail.com
* Created on: 2/3/20
*/
public class FakeNewsTransformation {
private final String DATSETS_PATH = "data/FakeNews/";

public void transform(boolean useSparkLocal) {
Schema schema = new Schema.Builder()
.addColumnString("uuid")
.addColumnInteger("ord_in_thread")
.addColumnString("author")
.addColumnString("published")
.addColumnsString("title","text","language","crawled","site_url","country")
.addColumnInteger("domain_rank")
.addColumnString("thread_title")
.addColumnsInteger("spam_score","main_img_url","replies_count","participants_count","likes","comments","shares")
.addColumnCategorical("type", Arrays.asList("bias", "bs","conspiracy","fake","hate","junksci","satire","state"))
.build();

TransformProcess tp = new TransformProcess.Builder(schema)
.removeColumns("uuid", "ord_in_thread","author","published","site_url","country","thread_title")
.categoricalToInteger("type")
.build();

int numActions = tp.getActionList().size();
for (int i = 0; i < numActions; i++) {
System.out.println("\n\n===============================");
System.out.println("--- Schema after step " + i +
" (" + tp.getActionList().get(i) + ")--");
System.out.println(tp.getSchemaAfterStep(i));
}

SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator");
if (useSparkLocal) {
sparkConf.setMaster("local[*]");
}

sparkConf.setAppName("Fake News Spanish Corpus dataset transformation");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//Load our data using Spark
JavaRDD<String> lines = sc.textFile(DATSETS_PATH + "fake.csv");
int skipNumLines = 1;
//We first need to parse this format. It's comma-delimited (CSV) format, so let's parse it using CSVRecordReader:
RecordReader rr = new UltraCSVRecordReader();
// RecordReader rr = new CSVRecordReader();
JavaRDD<List<Writable>> parsedInputData = lines.map(new StringToWritablesFunction(rr));

//Now, let's execute the transforms we defined earlier:
JavaRDD<List<Writable>> processedData = SparkTransformExecutor.execute(parsedInputData, tp);

//For the sake of this example, let's collect the data locally and print it:
JavaRDD<String> processedAsString = processedData.map(new WritablesToStringFunction(","));
System.out.println("<<<<<<<<<<<<<<<PATH>>>>>>>>>>>>>");
File dataset = new File("dataset/FakeNews");
if (dataset.exists()) {
try {
FileUtils.deleteDirectory(dataset);
System.out.println("DELETE THE DIRECTORY");
} catch (IOException e) {
System.out.println("The directory was not delete");
e.printStackTrace();
}
}
System.out.println(dataset.getAbsolutePath());
System.out.println("<<<<<<<<<<<<<<<END-PATH>>>>>>>>>>>>>");
processedAsString.saveAsTextFile("file://" + dataset.getAbsolutePath()); //To save locally
//processedAsString.saveAsTextFile("hdfs://your/hdfs/save/path/here"); //To save to hdfs

List<String> processedCollected = processedAsString.collect();
List<String> inputDataCollected = lines.collect();


}

public static void main(String[] args) {
new FakeNewsTransformation().transform(true);
}
}

这是我使用 CSVRecordReader (DataVec) 时的输出错误

    java.lang.RuntimeException: java.io.IOException: Un-terminated quoted field at end of CSV line
at org.datavec.api.records.reader.impl.csv.CSVRecordReader.parseLine(CSVRecordReader.java:183)
at org.datavec.api.records.reader.impl.csv.CSVRecordReader.next(CSVRecordReader.java:175)
at org.datavec.spark.transform.misc.StringToWritablesFunction.call(StringToWritablesFunction.java:41)
at org.datavec.spark.transform.misc.StringToWritablesFunction.call(StringToWritablesFunction.java:33)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1211)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1218)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Un-terminated quoted field at end of CSV line
at org.datavec.api.records.reader.impl.csv.SerializableCSVParser.parseLine(SerializableCSVParser.java:276)
at org.datavec.api.records.reader.impl.csv.SerializableCSVParser.parseLine(SerializableCSVParser.java:186)
at org.datavec.api.records.reader.impl.csv.CSVRecordReader.parseLine(CSVRecordReader.java:181)
... 21 more

这就是当我将自己的 CSVRecordReader 与 univocity csv 解析器一起使用时的序列化问题(该库未实现可序列化)

  Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
at cu.desoft.cav.preprocessing.FakeNewsTransformation.transform(FakeNewsTransformation.java:71)
at cu.desoft.cav.preprocessing.FakeNewsTransformation.main(FakeNewsTransformation.java:101)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.io.NotSerializableException: com.univocity.parsers.csv.CsvParser
Serialization stack:
- object not serializable (class: com.univocity.parsers.csv.CsvParser, value: com.univocity.parsers.csv.CsvParser@75b6dd5b)
- field (class: cu.desoft.cav.RecordReader.UltraCSVRecordReader, name: csvParser, type: class com.univocity.parsers.csv.CsvParser)
- object (class cu.desoft.cav.RecordReader.UltraCSVRecordReader, cu.desoft.cav.RecordReader.UltraCSVRecordReader@1fedf0a4)
- field (class: org.datavec.spark.transform.misc.StringToWritablesFunction, name: recordReader, type: interface org.datavec.api.records.reader.RecordReader)
- object (class org.datavec.spark.transform.misc.StringToWritablesFunction, org.datavec.spark.transform.misc.StringToWritablesFunction@465b38e6)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 18 more

最佳答案

您必须修复 CSV 中导致错误的行。或者实现您的记录读取器可序列化。
基于 this univocity-parsers “处理未转义的引号,您可以将其配置为在发现任何异常时引发异常。”也许尝试一下?

关于java - CSVRecordReader 和 CSV 行末尾未终止的引用字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60041623/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com