我正在使用Salesforce Bulk 2.0 API来获取Salesforce Objects的数据。我已经创建了相应的函数,以便-
/**
* Fetches results from a Job ID of Batch query.
* NOTE : Provided Job ID must be in "JobComplete" state to fetch results
*
* @param jobId Job ID
* @param maxRecords Number of records to fetch in one chunk of API hit
* @return {@link String} CSV-formatted String/results (text/csv)
* @throws IOException Exception in mapping object
* @author ujjawal pandey
*/
public String getJobResults(String jobId, String maxRecords) throws IOException {
String getJobInfoUrl = String.format(JobResourcePath.getJobResultPath(), this.apiUrl,
this.apiVersion, jobId, maxRecords);
String sforceLocator = null;
String filePath = jobId + ".csv";
boolean isFirstDatasetFetch = true;
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
Files.newOutputStream(Paths.get(filePath)),
StandardCharsets.UTF_8));
do {
String getJobInfoUrlLocator = getJobInfoUrl;
HttpClient httpClient = HttpClientBuilder.create().build();
if (sforceLocator != null) {
getJobInfoUrlLocator = getJobInfoUrl + "&locator=" + sforceLocator;
}
LOGGER.info(getJobInfoUrlLocator);
HttpGet httpGet = new HttpGet(getJobInfoUrlLocator);
httpGet.setHeader("Authorization", "Bearer " + accessToken);
HttpResponse response = httpClient.execute(httpGet);
LOGGER.info(response.toString());
int responseCode = response.getStatusLine().getStatusCode();
String responseBody = "";
if (responseCode == Constants.HTTP_STATUS_OK) {
org.apache.http.HttpEntity entity = response.getEntity();
sforceLocator = response.getFirstHeader("Sforce-Locator").getValue();
LOGGER.info("Locator is: " + sforceLocator);
responseBody = EntityUtils.toString(entity);
LOGGER.debug(String.valueOf(responseBody));
if (isFirstDatasetFetch) {
writer.write(responseBody);
isFirstDatasetFetch = false;
} else {
writer.write(responseBody.substring(
responseBody.indexOf('\n') + 1));
}
} else {
LOGGER.error(responseBody);
throw new RuntimeException(responseBody);
}
} while (sforceLocator != null && !sforceLocator.equals("null"));
writer.close();
return filePath;
}
字符串
问题是创建的CSV格式是正确的,但在某些列中出现了一些奇怪的字符。例如,â¢
(欧元在它们之间,粘贴使其消失)
现在,当我在Spark中阅读时,使用以下配置-
spark.read()
.option("header", "true")
.option("delimiter", ",")
.option("lineSep", "\n")
.option("multiLine", "true")
.option("encoding", "UTF-8")
.csv(hdfsTempCsvStoragePath + "/" + csvPath);
型
我得到了额外的行(4),因为下面的字符“可能”。PFA。
x1c 0d1x的数据
我知道这个问题与编码有关,但与良好的理解无关。
1.我在这里错过了什么(基本概念),因为这个问题正在发生?
1.解决这个问题的最佳方法是什么?
因为我认为我错过了一些东西,所以我没有尝试太多。我认为最后的选择是我必须清理CSV,以便Spark可以读取它。
2条答案
按热度按时间bxfogqkk1#
我没有代表评论。这不是一个真正的答案。但我有几个意见,你应该会发现有帮助。我不知道Spark。
1.当执行创建CSV的Java时,将此属性添加到命令行:-Dfile.encoding=UTF-8
这将使UTF-8字符不会被篡改为两个非ASPLESS字符,并且它将保留可处理的非ASPLESS UTF-8字符。
1.如果你要清理CSV文件,这个正则表达式可以用来识别特殊字符:/[^\x00-\x7f]/
如果你可以访问从时间开始的数据,你可以使用这个正则表达式来识别所有之前需要注意的字符。
没有人可以建议你最好的方法,因为我们不知道数据,什么进程正在监视你的批量API作业(例如,如果你抛出一个异常,数据是否可以被处理以供人类审查),有多频繁地出现特殊字符(以及哪些字符),或者数据100%正确的紧迫性(或者即使这是可能的)。
zbq4xfa02#
实际上,这些额外的行不是因为下面的字符,而是因为上面的一行,它有双引号,那些没有转义,因此spark无法正确解析CSV,将下面的行分成多行。
字符串
另外,那些奇怪的字符实际上是“子弹”,但由于我的函数中的编码不匹配,它们就这样出现了。我用UTF-8编写CSV,但
EntityUtils.toString(entity);
正在将其转换为其他编码,所以我通过EntityUtils.toString(entity, "UTF-8")
强制将toString转换为UTF-8。