gpt4 book ai didi

apache-spark - 有没有一种方法可以使用 Spark 使用 TLS 在 FTP 中加载文件

转载 作者:行者123 更新时间:2023-12-05 05:07:41 35 4
gpt4 key购买 nike

我正在将 python 进程移动到 Spark。在 python 中,我们使用 ftplib 连接文件并将其下载到 EC2 实例。下载文件后,我们将上传到 S3。我们正在过渡到无服务器基础设施,并希望通过 AWS Glue 在 spark 中加载文件,然后使用多部分上传将其移动到 S3。我试图在更大的胶水实例类型中运行当前代码,但机器仍然内存不足(20gb 文件)。

旧的 python 代码

"""
This script will get the backup file
"""

import sys
from datetime import datetime
import re

import ftplib
from retry import retry
import shutil

from tools.python.s3_functions import s3_upload
from python_scripts.get import *


def get_ftp_connector(path, user, password):
ftp = ftplib.FTP_TLS(path)
ftp.login(user, password)
ftp.prot_p()
return ftp


def get_ftp_files_list(ftp, dir):
ftp.cwd(dir)
files = ftp.nlst()
print(str("-".join(files)))
if "filecompleted.txt" not in files:
print("Failed to find filescompleted.txt file in ftp server.")
raise Exception("Failed to find filescompleted.txt file in ftp server.")
regex_str = 'Backup_File_Mask_Goes_here([\d]{8}).bak'
find_date_regex = re.compile(regex_str)
searched = [(f, find_date_regex.match(f)) for f in files if find_date_regex.match(f)]
searched = \
[(file_name, datetime.strptime(regex_result.groups()[0], '%Y%m%d')) for file_name, regex_result in searched]
searched = sorted(searched, key=lambda elem: elem[1], reverse=True)
if not searched:
print("Failed to find appropriate file in ftp server.")
raise Exception("Failed to find appropriate file in ftp server.")
return searched[0]


class FtpUploadTracker:
size_written = 0
total_size = 0
last_shown_percent = "X"

def __init__(self, total_size, bk_file):
self.total_size = total_size
self.bk_file = bk_file
self.output_file = open(self.bk_file, 'wb')
self.start_time = datetime.now()

def handle(self, block):
self.size_written += len(block)
percent_complete = str(round((self.size_written / self.total_size) * 100, 1))
self.output_file.write(block)
time_elapsed = (datetime.now() - self.start_time).total_seconds()
speed = round(self.size_written / (1000 * 1000 * time_elapsed), 2)
msg = "{percent}% complete @ average speed of {speed}MB/s : total run time {minutes}m".\
format(percent=percent_complete, speed=speed, minutes=round(time_elapsed/60))
if time_elapsed > 600 and speed < 1:
print("Zombie connection, failing dl.")
raise Exception("Zombie connection, failing dl.")
if self.last_shown_percent != percent_complete:
self.last_shown_percent = percent_complete
print(msg)

def close(self):
self.output_file.close()


@retry(tries=4, delay=300)
def retrieve_db():
"""
This function will retrieve via FTP the backup
:return: None
"""
ftp = get_ftp_connector(FTP_PATH, FTP_USER, FTP_PASSWORD)
# return back the most recent entry
file_name, file_date = get_ftp_files_list(ftp, 'database')
file_epoch = (file_date - datetime(1970, 1, 1)).total_seconds()
new_file_name = "backup_{epoch}.bak".format(epoch=str(int(file_epoch)))
if os.path.exists(DATAFILEPATH):
shutil.rmtree(DATAFILEPATH)
if not os.path.exists(DATAFILEPATH):
os.makedirs(DATAFILEPATH)
temp_backup_file_location = os.path.join(DATAFILEPATH + new_file_name)
print("Found file {file_name}, and downloading it to {loc}".
format(file_name=file_name, loc=temp_backup_file_location))
ftp_handler = FtpUploadTracker(ftp.size(file_name), temp_backup_file_location)
ftp.retrbinary("RETR " + file_name, ftp_handler.handle)
ftp.quit()
ftp_handler.close()
print("Finished download. Uploading to S3.")
s3_upload(DATAFILEPATH, new_file_name, bucket, "db_backup")
os.remove(temp_backup_file_location)


def main():
try:
retrieve_db()
except Exception as e:
print("Failed to download backup after 4 tries with error {e}.".format(e=e))
return 1
return 0


if __name__ == "__main__":
rtn = main()
sys.exit(rtn)

新的 Spark 代码(进行中):用户名有一个 |使我对 uri 进行编码的字符。当我运行代码时,连接被拒绝。我能够为 python 使用相同的连接信息。

from pyspark import SparkContext
from pyspark import SparkFiles
import urllib

sc = SparkContext()
ftp_path = "ftp://Username:password@ftplocation.com/path_to_file"
file_path_clean = urllib.parse.urlencode(ftp_path, safe='|')
print(f"file_path_clean: {file_path_clean}")
sc.addFile(ftp_path)
filename = SparkFiles.get(file_path.split('/')[-1])
print(f"filename: {filename}")

rdd = sc.textFile("file://" + filename)
print("We got past rdd = sc.textFile(file:// + filename)")
rdd.take(10)
rdd.collect()
print(rdd)

最佳答案

解决问题的三种方法:

  1. 使用由 FTP 支持的挂载文件系统,并从 Spark 写入文件系统。

  2. 使用 Spark 到 SFTP 连接器,例如 spark-sftp .

  3. 使用 Spark 将文件写入其他地方,然后作为单独的步骤复制到 SFTP。由于 SFTP 的各种可靠性问题以及 Spark 在失败的写入操作期间留下部分输出的事实,这是我们采用的路径。我们在 Scala 中使用类似于以下代码的代码将 TB 写入 SFTP 端点。希望对大家的Python工作有所帮助。

/** Defines some high-level operations for interacting with remote file protocols like FTP, SFTP, etc.
*/
trait RemoteFileOperations extends Closeable {
var backoff: BlockingRetry.Backoff = Backoff.linear(3000)
var retry: BlockingRetry.Retry = Retry.maxRetries(3)
var recover: Recovery = recoverable(this)
var ignore: Ignored = nonRecoverable

def listFiles(path: String = ""): Seq[FInfo]

def uploadFile(localPath: String, remoteDirectory: String): Unit

def downloadFile(localPath: String, remotePath: String): Unit

def deleteAll(path: String): Unit

def connect(): Unit = {}

def disconnect(): Unit = {}

def reconnect(): Unit = {
disconnect()
connect()
}

override def close(): Unit = disconnect()

/** Wraps a block of code and allows it to be retried when [[recoverable()]] conditions
* are met. [[BlockingRetry.retry()]] is called with the var fields
* [[backoff]], [[retry]], [[recover]], and [[ignore]], which can all be reconfigured.
*/
def retryable[A](f: => A): A = {
BlockingRetry.retry(retry, backoff, recover, ignore) {
f
}
}

def recoverable(fileOp: RemoteFileOperations): Recovery = {
case (_: SocketTimeoutException, _: Int) =>
fileOp.reconnect()
None
}

def nonRecoverable: Ignored = {
case _: UnknownHostException |
_: SSLException |
_: SocketException |
_: IllegalStateException =>
}
}

class SSHJClient(host: String, username: String, password: String) extends RemoteFileOperations {

import net.schmizz.keepalive.KeepAliveProvider
import net.schmizz.sshj.connection.ConnectionException
import net.schmizz.sshj.sftp.SFTPClient
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.xfer.FileSystemFile
import net.schmizz.sshj.{DefaultConfig, SSHClient}

override def listFiles(path: String): Seq[FInfo] = {
import collection.JavaConverters._
retryable {
sftpSession(sftp => {
sftp.ls(path).asScala
.filter(f => f.getName != "." && f.getName != "..")
.map(f => FInfo(f.getPath, f.getParent, f.isDirectory, f.getAttributes.getSize, f.getAttributes.getMtime))
})
}
}

override def uploadFile(localPath: String, remoteDirectory: String): Unit = {
retryable {
sftpSession(sftp => {
sftp.getFileTransfer.setPreserveAttributes(false)
sftp.put(new FileSystemFile(localPath), remoteDirectory)
})
}
}

override def downloadFile(localPath: String, remotePath: String): Unit = {
retryable {
sftpSession(sftp => {
sftp.getFileTransfer.setPreserveAttributes(false)
sftp.get(remotePath, new FileSystemFile(localPath))
})
}
}

override def deleteAll(path: String): Unit =
throw new UnsupportedOperationException("#deleteAll is unsupported for SSHJClient")

private def sftpSession[A](f: SFTPClient => A): A = {
val defaultConfig = new DefaultConfig()
defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE)

val ssh = new SSHClient(defaultConfig)
try {
// This is equivalent to StrictHostKeyChecking=no which is disabled since we don't usually know
// the SSH remote host key ahead of time.
ssh.addHostKeyVerifier(new PromiscuousVerifier())
ssh.connect(host)
ssh.authPassword(username, password)

val sftp = ssh.newSFTPClient()
try {
f(sftp)
} finally {
sftp.close()
}
} finally {
ssh.disconnect()
}
}

override def recoverable(fileOp: RemoteFileOperations): Recovery = {
super.recoverable(fileOp).orElse {
case (e: ConnectionException, _: Int) =>
println(s"Recovering session from exception: $e")
None
}
}
}

关于apache-spark - 有没有一种方法可以使用 Spark 使用 TLS 在 FTP 中加载文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58919377/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com