I have below impala queries in one file in hdfs location.
I am trying to read that file and expand the file when I encounter the end_date as "2211-01-01"
我在HDFS位置的一个文件中有以下ImPala查询。当我遇到end_date为“2211-01-01”时,我正在尝试读取该文件并展开该文件
list of dates are as
日期列表如下
final_dates = ["2022-11-01", "2023-02-01", "2023-05-01", "2023-08-01"]
FINAL_DATES=[“2022-11-01”,“2023-02-01”,“2023-05-01”,“2023-08-01”]
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2022-04-01" and end_date < "2022-07-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FROM RANDOME_DATABASe.Random_TABLE_STG where end_date>=“2022-04-01”and end_date<“2022-07-01”;
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2022-07-01" and end_date < "2022-08-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FROM RANDOME_DATABASe.Random_TABLE_STG where end_date>=“2022-07-01”and end_date<“2022-08-01”;
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2022-08-01" and end_date < "2211-01-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FORM RANDOME_DATABASe.RANDOM_TABLE_STG where end_date>=“2022-08-01”and end_date<“2211-01-01”;
file = spark.read.text("/usr/ihitha/var/hive/test_script.py").
How do I do above generate new queries with new dates which are in a list by reading the file in pyspark ?
文件=spark.read.text(“/usr/ihitha/var/hive/test_script.py”).如何通过读取pyspark中的文件来使用列表中的新日期生成新查询?
expected output as below:-
预期产量如下:-
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2022-04-01" and end_date < "2022-07-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FROM RANDOME_DATABASe.Random_TABLE_STG where end_date>=“2022-04-01”and end_date<“2022-07-01”;
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2022-07-01" and end_date < "2022-08-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FROM RANDOME_DATABASe.Random_TABLE_STG where end_date>=“2022-07-01”and end_date<“2022-08-01”;
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2022-08-01" and end_date < "2022-11-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FROM RANDOME_DATABASe.Random_TABLE_STG where end_date>=“2022-08-01”and end_date<“2022-11-01”;
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2022-11-01" and end_date < "2023-02-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FORM RANDOME_DATABASe.RANDOM_TABLE_STG where end_date>=“2022-11-01”and end_date<“2023-02-01”;
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2023-02-01" and end_date < "2023-05-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FORM RANDOME_DATABASe.RANDOM_TABLE_STG where end_date>=“2023-02-01”and end_date<“2023-05-01”;
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2023-05-01" and end_date < "2023-08-01";
插入覆盖表randome_database.random_table partition(end_date)select * from randome_database.random_table_stg where end_date >=“2023-05-01”and end_date <“2023-08-01”;
insert overwrite table randome_database.random_table
partition (end_date)
select * from randome_database.random_table_stg
where end_date >= "2023-08-01" and end_date < "2211-01-01";
插入重写表RANDOME_DATABASE.RANDOM_TABLE PARTITION(End_Date)SELECT*FORM RANDOME_DATABASe.RANDOM_TABLE_STG where end_date>=“2023-08-01”and end_date<“2211-01-01”;
更多回答
you need to include a snippet of what you have tried already and perhaps any error messages you encountered. That helps improve the quality of the questions
您需要包括您已经尝试过的内容的片段,可能还包括您遇到的任何错误消息。这有助于提高问题的质量
Here is a code snippet to get you started
以下是帮助您入门的代码片段
# Initialize SparkSession
spark = SparkSession.builder.appName("DateExpansion").getOrCreate()
# Define the list of dates
final_dates = ["2022-11-01", "2023-02-01", "2023-05-01", "2023-08-01"]
# Read the file
file_path = "/usr/ihitha/var/hive/test_script.py"
file_content = spark.read.text(file_path)
# Filter out the lines that don't contain the end_date condition
filtered_content = file_content.filter(file_content.value.contains('end_date >=') & file_content.value.contains('end_date <'))
# Iterate over final_dates and generate new queries
new_queries = []
for i in range(len(final_dates)-1):
start_date = final_dates[i]
end_date = final_dates[i+1]
new_query = filtered_content.withColumn("value", lit(f'select * from randome_database.random_table_stg where end_date >= "{start_date}" and end_date < "{end_date}";')).select("value")
new_queries.append(new_query)
# Ensure all DataFrames have the same schema
for query in new_queries:
query.createOrReplaceTempView("temp_view")
# Merge the queries
final_queries = spark.sql("SELECT value FROM temp_view UNION ALL SELECT value FROM temp_view")
# Show the final queries
final_queries.show(truncate=False)
更多回答
Merge is not working here.
合并在这里不起作用。
TypeError: union() takes 2 positional arguments but 7 were given
TypeError:Union()接受2个位置参数,但给出了7个
@Reddy I have modified the answer. Let me know if it sorts you out now
@Reddy我已经修改了答案。如果它现在把你解决了,让我知道
我是一名优秀的程序员,十分优秀!