- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在将 python 进程移动到 Spark。在 python 中,我们使用 ftplib 连接文件并将其下载到 EC2 实例。下载文件后,我们将上传到 S3。我们正在过渡到无服务器基础设施,并希望通过 AWS Glue 在 spark 中加载文件,然后使用多部分上传将其移动到 S3。我试图在更大的胶水实例类型中运行当前代码,但机器仍然内存不足(20gb 文件)。
旧的 python 代码
"""
This script will get the backup file
"""
import sys
from datetime import datetime
import re
import ftplib
from retry import retry
import shutil
from tools.python.s3_functions import s3_upload
from python_scripts.get import *
def get_ftp_connector(path, user, password):
ftp = ftplib.FTP_TLS(path)
ftp.login(user, password)
ftp.prot_p()
return ftp
def get_ftp_files_list(ftp, dir):
ftp.cwd(dir)
files = ftp.nlst()
print(str("-".join(files)))
if "filecompleted.txt" not in files:
print("Failed to find filescompleted.txt file in ftp server.")
raise Exception("Failed to find filescompleted.txt file in ftp server.")
regex_str = 'Backup_File_Mask_Goes_here([\d]{8}).bak'
find_date_regex = re.compile(regex_str)
searched = [(f, find_date_regex.match(f)) for f in files if find_date_regex.match(f)]
searched = \
[(file_name, datetime.strptime(regex_result.groups()[0], '%Y%m%d')) for file_name, regex_result in searched]
searched = sorted(searched, key=lambda elem: elem[1], reverse=True)
if not searched:
print("Failed to find appropriate file in ftp server.")
raise Exception("Failed to find appropriate file in ftp server.")
return searched[0]
class FtpUploadTracker:
size_written = 0
total_size = 0
last_shown_percent = "X"
def __init__(self, total_size, bk_file):
self.total_size = total_size
self.bk_file = bk_file
self.output_file = open(self.bk_file, 'wb')
self.start_time = datetime.now()
def handle(self, block):
self.size_written += len(block)
percent_complete = str(round((self.size_written / self.total_size) * 100, 1))
self.output_file.write(block)
time_elapsed = (datetime.now() - self.start_time).total_seconds()
speed = round(self.size_written / (1000 * 1000 * time_elapsed), 2)
msg = "{percent}% complete @ average speed of {speed}MB/s : total run time {minutes}m".\
format(percent=percent_complete, speed=speed, minutes=round(time_elapsed/60))
if time_elapsed > 600 and speed < 1:
print("Zombie connection, failing dl.")
raise Exception("Zombie connection, failing dl.")
if self.last_shown_percent != percent_complete:
self.last_shown_percent = percent_complete
print(msg)
def close(self):
self.output_file.close()
@retry(tries=4, delay=300)
def retrieve_db():
"""
This function will retrieve via FTP the backup
:return: None
"""
ftp = get_ftp_connector(FTP_PATH, FTP_USER, FTP_PASSWORD)
# return back the most recent entry
file_name, file_date = get_ftp_files_list(ftp, 'database')
file_epoch = (file_date - datetime(1970, 1, 1)).total_seconds()
new_file_name = "backup_{epoch}.bak".format(epoch=str(int(file_epoch)))
if os.path.exists(DATAFILEPATH):
shutil.rmtree(DATAFILEPATH)
if not os.path.exists(DATAFILEPATH):
os.makedirs(DATAFILEPATH)
temp_backup_file_location = os.path.join(DATAFILEPATH + new_file_name)
print("Found file {file_name}, and downloading it to {loc}".
format(file_name=file_name, loc=temp_backup_file_location))
ftp_handler = FtpUploadTracker(ftp.size(file_name), temp_backup_file_location)
ftp.retrbinary("RETR " + file_name, ftp_handler.handle)
ftp.quit()
ftp_handler.close()
print("Finished download. Uploading to S3.")
s3_upload(DATAFILEPATH, new_file_name, bucket, "db_backup")
os.remove(temp_backup_file_location)
def main():
try:
retrieve_db()
except Exception as e:
print("Failed to download backup after 4 tries with error {e}.".format(e=e))
return 1
return 0
if __name__ == "__main__":
rtn = main()
sys.exit(rtn)
新的 Spark 代码(进行中):用户名有一个 |使我对 uri 进行编码的字符。当我运行代码时,连接被拒绝。我能够为 python 使用相同的连接信息。
from pyspark import SparkContext
from pyspark import SparkFiles
import urllib
sc = SparkContext()
ftp_path = "ftp://Username:password@ftplocation.com/path_to_file"
file_path_clean = urllib.parse.urlencode(ftp_path, safe='|')
print(f"file_path_clean: {file_path_clean}")
sc.addFile(ftp_path)
filename = SparkFiles.get(file_path.split('/')[-1])
print(f"filename: {filename}")
rdd = sc.textFile("file://" + filename)
print("We got past rdd = sc.textFile(file:// + filename)")
rdd.take(10)
rdd.collect()
print(rdd)
最佳答案
解决问题的三种方法:
使用由 FTP 支持的挂载文件系统,并从 Spark 写入文件系统。
使用 Spark 到 SFTP 连接器,例如 spark-sftp .
使用 Spark 将文件写入其他地方,然后作为单独的步骤复制到 SFTP。由于 SFTP 的各种可靠性问题以及 Spark 在失败的写入操作期间留下部分输出的事实,这是我们采用的路径。我们在 Scala 中使用类似于以下代码的代码将 TB 写入 SFTP 端点。希望对大家的Python工作有所帮助。
/** Defines some high-level operations for interacting with remote file protocols like FTP, SFTP, etc.
*/
trait RemoteFileOperations extends Closeable {
var backoff: BlockingRetry.Backoff = Backoff.linear(3000)
var retry: BlockingRetry.Retry = Retry.maxRetries(3)
var recover: Recovery = recoverable(this)
var ignore: Ignored = nonRecoverable
def listFiles(path: String = ""): Seq[FInfo]
def uploadFile(localPath: String, remoteDirectory: String): Unit
def downloadFile(localPath: String, remotePath: String): Unit
def deleteAll(path: String): Unit
def connect(): Unit = {}
def disconnect(): Unit = {}
def reconnect(): Unit = {
disconnect()
connect()
}
override def close(): Unit = disconnect()
/** Wraps a block of code and allows it to be retried when [[recoverable()]] conditions
* are met. [[BlockingRetry.retry()]] is called with the var fields
* [[backoff]], [[retry]], [[recover]], and [[ignore]], which can all be reconfigured.
*/
def retryable[A](f: => A): A = {
BlockingRetry.retry(retry, backoff, recover, ignore) {
f
}
}
def recoverable(fileOp: RemoteFileOperations): Recovery = {
case (_: SocketTimeoutException, _: Int) =>
fileOp.reconnect()
None
}
def nonRecoverable: Ignored = {
case _: UnknownHostException |
_: SSLException |
_: SocketException |
_: IllegalStateException =>
}
}
class SSHJClient(host: String, username: String, password: String) extends RemoteFileOperations {
import net.schmizz.keepalive.KeepAliveProvider
import net.schmizz.sshj.connection.ConnectionException
import net.schmizz.sshj.sftp.SFTPClient
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.xfer.FileSystemFile
import net.schmizz.sshj.{DefaultConfig, SSHClient}
override def listFiles(path: String): Seq[FInfo] = {
import collection.JavaConverters._
retryable {
sftpSession(sftp => {
sftp.ls(path).asScala
.filter(f => f.getName != "." && f.getName != "..")
.map(f => FInfo(f.getPath, f.getParent, f.isDirectory, f.getAttributes.getSize, f.getAttributes.getMtime))
})
}
}
override def uploadFile(localPath: String, remoteDirectory: String): Unit = {
retryable {
sftpSession(sftp => {
sftp.getFileTransfer.setPreserveAttributes(false)
sftp.put(new FileSystemFile(localPath), remoteDirectory)
})
}
}
override def downloadFile(localPath: String, remotePath: String): Unit = {
retryable {
sftpSession(sftp => {
sftp.getFileTransfer.setPreserveAttributes(false)
sftp.get(remotePath, new FileSystemFile(localPath))
})
}
}
override def deleteAll(path: String): Unit =
throw new UnsupportedOperationException("#deleteAll is unsupported for SSHJClient")
private def sftpSession[A](f: SFTPClient => A): A = {
val defaultConfig = new DefaultConfig()
defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE)
val ssh = new SSHClient(defaultConfig)
try {
// This is equivalent to StrictHostKeyChecking=no which is disabled since we don't usually know
// the SSH remote host key ahead of time.
ssh.addHostKeyVerifier(new PromiscuousVerifier())
ssh.connect(host)
ssh.authPassword(username, password)
val sftp = ssh.newSFTPClient()
try {
f(sftp)
} finally {
sftp.close()
}
} finally {
ssh.disconnect()
}
}
override def recoverable(fileOp: RemoteFileOperations): Recovery = {
super.recoverable(fileOp).orElse {
case (e: ConnectionException, _: Int) =>
println(s"Recovering session from exception: $e")
None
}
}
}
关于apache-spark - 有没有一种方法可以使用 Spark 使用 TLS 在 FTP 中加载文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58919377/
根据 FTP 协议(protocol)(rfc 959),当 ftp 客户端连接到 ftp 服务器时,应该在 ftp 客户端和 ftp 服务器之间建立控制连接。而当ftp客户端发送{LIST, R
是否可以使用 FTP 命令重命名 FTP 服务器上的文件夹? 我知道有一个用于文件重命名的 Rename 命令,但是我可以将它用于文件夹名称吗? 最佳答案 AFAIK,相同的命令( RNFR/RNTO
我有一个 ftp://host/path URL,我想下载文件并在 Erlang 中连接丢失时继续下载。 使用 ftp 开始下载非常简单模块,但如何恢复它? 最佳答案 是的..就像 Peer 提到的.
我一直在阅读 FTP 规范并使用 Wireshark 来捕获我的 FTP 客户端发送/接收的数据包,并有一些关于它们的问题。 首先是来自我的 FTP 服务器的“连接问候语”(如 FTP RFC 所称)
我有一个 ColdFusion 应用程序,用于在开发和生产服务器之间传输文件。实际发送文件的代码如下: ftp = new Ftp(); ftp.setUsername(username); ftp.
我正在尝试连接到允许匿名访问的 FTP 服务器,但我不知道如何指定执行此操作所需的适当用户名/密码。 我尝试过使用匿名/匿名作为用户/通行证,但没有成功,以及空字符串和两者的各种组合等。 这一定是我所
ftp rstatus $remotefile 在Solaris 上出现“?无效命令”错误。我发现,与 HP-UX 不同,Solaris 10 上没有像 rstatus 这样的 ftp 命令。基本上在
我是 Spring 的新手,我目前正在研究 spring 与 ftp 支持的集成。 我从本地目录传输到服务器 (filZilla)。 我从服务器下载了文件,没问题。 但我想知道如何将文件从 FTP 服
我想通过加密连接 FTP,需要使用 PHP 代码通过 TLS 隐式 FTP。 我已经尝试使用普通 FTP 进行加密,它可以工作,但加密需要通过 TLS 的隐式 FTP 不起作用。 最佳答案 尝试使用下
我已经成功使用 LuaSocket 的 TCP 工具,但我在使用它的 FTP 模块时遇到了问题。尝试检索(小)文件时,我总是超时。我可以在被动模式下使用 Firefox 或 ftp 下载文件(在 Ub
我尝试使用 putty 使用 FTP 详细信息主机名、用户名和密码登录到服务器。但是当我输入密码时它显示拒绝访问。 对于我的另一个网站,我输入了我的主机名并单击在腻子中打开,它显示“网络错误:连接超时
只是我,还是 FTP 看起来有点过时?它看起来很慢而且效率低下,而且它已经有 30 多年的历史了,并不是所有的旧东西都是坏的 :) 有哪些协议(protocol)可能成为 FTP 的继任者? 我用过一
我有一个有点相关但不同的问题 here . 我有一个批处理脚本( *.bat 文件),例如: @ftp -i -s:"%~f0"&GOTO:EOF open ftp.myhost.com myuser
我正在使用 IBM Mainframe TSO 从数据集中查看文件。最近有人告诉我每天开始将最新一代的数据集通过 FTP 传输到我桌面上的文件夹中。问题是我的 FTP 脚本只允许我用我输入的确切名称
我正在尝试使用 atom 包“Remote-FTP”和私钥连接到我的服务器。 我在我的服务器上设置了 SSH key ,并且可以使用腻子成功连接。 私钥保存在我的项目文件夹中,我有一个现有的 .ftp
我的 ftp 文件夹中有一组文件。我只能访问 ftp 模式。我想将那些扩展名为 .txt 的文件重命名为 .done 例如: 1.txt, 2.txt, 3.txt 到 1.done, 2.done,
lcd 更改本地目录。 ls 列出远程目录上的文件。 我想要的是lls,列出本地目录上的文件。 这可能吗? 我知道我总是可以打开另一个终端来执行此操作,但我很懒! 最佳答案 是的: !dir ! 告诉
关闭。这个问题是opinion-based 。目前不接受答案。 想要改进这个问题吗?更新问题,以便 editing this post 可以用事实和引文来回答它。 . 已关闭 9 年前。 社区去年审查
我的 FTP 测试服务器有问题。我已经安装并配置了 FileZilla 服务器,它正在监听端口 21 上的控制连接,然后它可以在 50100 和 51100 之间的端口上提供被动模式数据连接。 我正在
我正在运行 Filezilla Server 0.9.45 beta 来远程管理我的服务器。设置完成后,我测试使用 IP 127.0.0.1 连接到它,并且工作成功。但是,为了远程连接到服务器,我将端
我是一名优秀的程序员,十分优秀!