gpt4 book ai didi

python - 在python中每次迭代时分别获取函数的日志

转载 作者:太空宇宙 更新时间:2023-11-04 10:17:38 24 4
gpt4 key购买 nike

我有一个如下所示的 pyspark 脚本。在此脚本中,我循环遍历 input 文件以获取表名并执行代码。

现在我想在每次迭代函数mysql_spark时单独收集日志。

例如:

输入文件

table1
table2
table3

现在,当我执行 pyspark 脚本时,我将所有三个表的日志记录在一个文件中。

我想要的是 3 个单独的日志文件 1 个用于每个表

Pyspark 脚本:

#!/usr/bin/env python
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

#Condition to specify exact number of arguments in the spark-submit command line
if len(sys.argv) != 5:
print "Invalid number of args......"
print "Usage: spark-submit import.py Arguments"
exit()
args_file = sys.argv[1]
hivedb = sys.argv[2]
mysqldb=sys.argv[3]
mysqltable=sys.argv[4]

def mysql_spark(table, hivedb, mysqldb, mysqltable):

print "*********************************************************table = {} ***************************".format(table)

df = sqlContext.table("{}.{}".format(mysqldb, mysqltable))

df.registerTempTable("mytempTable")

sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table))

input = sc.textFile('/user/XXXXXXXX/mysql_spark/%s' %args_file).collect()

for table in input:
mysql_spark(table, hivedb, mysqldb, mysqltable)

sc.stop()

Shell 脚本调用 pyspark 脚本文件运行。

#!/bin/bash

source /home/$USER/mysql_spark/source.sh
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }

args_file=$1

TIMESTAMP=`date "+%Y-%m-%d"`
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log

#Function to get the status of the job creation
function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
exit 1
else
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}

spark-submit --name "${args_file}" --master "yarn-client" /home/$USER/mysql_spark/mysql_spark.py ${args_file} ${hivedb} ${mysqldb} ${mysqltable}

g_STATUS=$?
log_status $g_STATUS "Spark job ${args_file} Execution"

示例日志文件:

Connection to spark
***************************table = table 1 ********************************
created dataframe
created table
delete temp directory
***************************table = table 2 ********************************
created dataframe
created table
delete temp directory
***************************table = table 3 ********************************
created dataframe
created table
delete temp directory

预期输出

table1.logfile

Connection to spark
***************************table = table 1 ********************************
created dataframe
created table
delete temp directory

table2.logfile

***************************table = table 1 ********************************
created dataframe
created table
delete temp directory

table3.logfile

***************************table = table 1 ********************************
created dataframe
created table
delete temp directory
shutdown sparkContext

我怎样才能做到这一点?

有可能吗?

最佳答案

您可以为每次迭代创建新文件并将数据写入其中。

这是一个简单的例子:

lis =['table1','table2']

for table in lis:
logfile = open(str(table)+".logfile",'w')
logfile.write(str(table))
logfile.close()

在你的代码中,如果你实现相同的概念并将文件对象传递给 mysql_spark 函数,那么它应该可以在每次迭代中工作。

for table in input:
logfile = open(str(table)+".logfile",'w')
mysql_spark(table, hivedb, mysqldb, mysqltable, logfile)
logfile.close()

关于python - 在python中每次迭代时分别获取函数的日志,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45378619/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com