gpt4 book ai didi

python - 使用 shell 脚本在 python 中收集函数的日志

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:19:26 25 4
gpt4 key购买 nike

我的 pyspark 脚本运行良好。此脚本将从 mysql 中获取数据并在 HDFS 中创建配置单元表。

pyspark 脚本如下。

#!/usr/bin/env python
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

#Condition to specify exact number of arguments in the spark-submit command line
if len(sys.argv) != 8:
print "Invalid number of args......"
print "Usage: spark-submit import.py Arguments"
exit()
table = sys.argv[1]
hivedb = sys.argv[2]
domain = sys.argv[3]
port=sys.argv[4]
mysqldb=sys.argv[5]
username=sys.argv[6]
password=sys.argv[7]

df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load()

#Register dataframe as table
df.registerTempTable("mytempTable")

# create hive table from temp table:
sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table))

sc.stop()

现在将使用 shell 脚本调用此 pyspark 脚本。对于此 shell 脚本,我将表名作为文件中的参数传递。

shell 脚本如下。

#!/bin/bash

source /home/$USER/spark/source.sh
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }

args_file=$1

TIMESTAMP=`date "+%Y-%m-%d"`
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log

#Function to get the status of the job creation
function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
#echo "Please find the attached log file for more details"
exit 1
else
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}
while read -r table ;do
spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1
g_STATUS=$?
log_status $g_STATUS "Spark job ${table} Execution"
done < "${args_file}"

echo "************************************************************************************************************************************************************************"

我能够使用上述 shell 脚本为 args_file 中的每个单独的表收集日志。

现在mysql中有200多张表。我修改了 pyspark 脚本,如下所示。我创建了一个函数来处理 args_file 并执行代码。

新的spark脚本

#!/usr/bin/env python
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

#Condition to specify exact number of arguments in the spark-submit command line
if len(sys.argv) != 8:
print "Invalid number of args......"
print "Usage: spark-submit import.py Arguments"
exit()
args_file = sys.argv[1]
hivedb = sys.argv[2]
domain = sys.argv[3]
port=sys.argv[4]
mysqldb=sys.argv[5]
username=sys.argv[6]
password=sys.argv[7]

def testing(table, hivedb, domain, port, mysqldb, username, password):

print "*********************************************************table = {} ***************************".format(table)
df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load()

#Register dataframe as table
df.registerTempTable("mytempTable")

# create hive table from temp table:
sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table))

input = sc.textFile('/user/XXXXXXX/spark_args/%s' %args_file).collect()

for table in input:
testing(table, hivedb, domain, port, mysqldb, username, password)

sc.stop()

现在我想为 args_file 中的单个表收集日志。但我只得到一个日志文件,其中包含所有表的日志。

我怎样才能达到我的要求?还是我做的方法完全错误

New shell script:

spark-submit --name "${args_file}" --master "yarn-client" --num-executors 2 --executor-memory 6g  --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${args_file}.log 2>&1

最佳答案

你可以做的是写一个 python将获取单个日志文件并在日志文件前面剪切一行的脚本 prints table姓名。

例如:

*************************************table=table1***************

那么下一个日志文件开始于

*************************************table=table2****************

等等。您还可以将表名作为文件名

关于python - 使用 shell 脚本在 python 中收集函数的日志,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45684000/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com