- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一个包含如下行的文件(文件名:sample.csv
)
Id,Query
T1012,"Select * from employee_dim limit 100"
T1212,"Select * from department_dim limit 100"
T1231,"Select dept_number,location,dept_name from locations"
我需要遍历此文件 (sample.csv
) 并获取第二列(“query”),在 hive 数据库中运行它并获取结果,然后将其保存到名为 T1012_result.csv
的新文件,并对所有行执行类似操作。
你能帮忙吗?
我尝试通过 spark 读取文件并将其转换为列表,然后使用无效的 sparksession 执行 SQL 查询。
from pyspark.sql import SparkSession,HiveContext
spark=SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql("use sample")
input=spark.read.csv("sample.csv")
#input.select('_c1').show()
import pandas as pd
a=input.toPandas().values.tolist()
for i in a :
print i[1]
spark.sql('pd.DataFrame(i)')
最佳答案
更新:spark
file_path="file:///user/vikrant/inputfiles/multiquery.csv"
df=spark.read.format("com.databricks.spark.csv").option("header", "true").load(file_path)
+---+-------------------------------+
|id |query |
+---+-------------------------------+
|1 |select * from exampledate |
|2 |select * from test |
|3 |select * from newpartitiontable|
+---+-------------------------------+
def customFunction(row):
for row in df.rdd.collect():
item=(row[1])
filename=(row[0])
query=""
query+=str(item)
newdf=spark.sql(query)
savedataframe(newdf,filename)
def savedataframe(newdf,filename):
newdf.coalesce(1).write.csv("/user/dev/hadoop/external/files/file_" + filename + ".csv")
customFunction(df)
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_1.csv
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_2.csv
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_3.csv
更新:使用 Pandas 我在 sql server 上有几个测试表,我正在将它们读入你在问题中提到的 pandas 数据框,并将查询结果保存到每个不同的文件中,并重命名为数据框的第一列:
import pandas as pd
import pyodbc
from pandas import DataFrame
connection = pyodbc.connect('Driver={ODBC Driver 13 for SQL Server};SERVER=yourservername;DATABASE=some_db;UID=username;PWD=password')
cursor = connection.cursor()
data=[['1','select * from User_Stage_Table'],['2','select * from User_temp_Table']]
df=pd.DataFrame(data,columns=['id','query'])
def get_query(df):
a=df.values.tolist()
for i in a:
query=i[1] #reading second column value as query
filename=i[0] #reading first column value as filename
write_query(query,filename) #calling write_query function
def write_query(query,filename):
df=pd.read_sql_query(query,connection)
df.to_csv(outfile_location+filename+".txt",sep=',',encoding='utf-8',index=None,mode='a')
get_query(df) #calling get_query function to build the query
out_file_location='G:\Testing\OutputFile\outfile'
您的输出文件名为:
outfile1.txt
#这将包含表 User_Stage_Table
outfile2.txt
#这将包含表 User_temp_Table'
如果这能解决您的问题或遇到任何进一步的问题,请告诉我。
关于PythonSpark : need to execute hive queries from file columns,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57315590/
我想知道是否可以访问放在 tomcat 的 conf 文件夹中的文件。通常我会在这个文件中放置多个 webapp 的配置,在 war 之外。 我想使用类路径独立于文件系统。 我过去使用过 lib 文件
我有一个 PowerShell 脚本,它获取文件列表并移动满足特定条件的文件。为什么即使对象为空,foreach 循环也会运行? 我假设如果 $i 不存在,它就不会运行。但是如果 $filePath
我已将 BasicAccountRule.drl 放置在我的 Web 应用程序中,位置为:C:/workspace/exim_design/src/main/resources/rules/drl/i
我使用 File.open('file.txt').class 和 File.open('file.txt').readlines.class 以及前者进行了检查一个返回 File,后者返回 Arra
我正在尝试使用 FileOutputStream 删除文件,在其中写入内容后。这是我用来编写的代码: private void writeContent(File file, String fileC
我正在尝试使用 flink 和 python 批处理 api 测试 Wordcount 经典示例。我的问题是,将数据源从 env.from_elements() 修改为 env.read_text()
我正在尝试制作一个可以同时处理多个不同文件的程序。我的想法是制作一个包含 20 个 FILE* 的数组,以便在我达到此限制时能够关闭其中一个并打开请求的新文件。 为此,我想到了一个函数,它选择一个选项
我有两个文件A和B文件A: 976464 792992 文件B TimeStamp,Record1,976464,8383,ABCD 我想搜索文件 A 和文件 B 中的每条记录并打印匹配的记录。打印的
我有一些保存在 map 中的属性文件。示例: Map map = new HashMap<>(); map.put("1", "One"); map.put("2", "Two"); map.put(
我正在尝试找出一个脚本文件,该文件接受一个包含文件列表的文件(每一行都是一个文件路径,即 path/to/file)并将它们合并到一个文件中。 例如: list.text -- path/to/fil
为了使用 File.CreateText() 和 File.AppendText() 你必须: 通过调用这些方法之一打开流 写消息 关闭流 处理流 为了使用 File.AppendAllText()
使用rsync时,如何在使用--files-from参数复制时重命名文件?我有大约190,000个文件,在从源复制到目标时,每个文件都需要重命名。我计划将文件列表放在一个文本文件中传递给--files
我在非服务器应用程序中使用 Spring(只需从 Eclipse 中某个类的 main() 编译并运行它)。 我的问题是作为 new FileSystemXmlApplicationContext 的
QNX (Neutrino 6.5.0) 使用 ksh 的开源实现作为其 shell 。许多提供的脚本,包括系统启动脚本,都使用诸如 if ! test /dev/slog -ef /dev/slog
当我尝试打开从我的应用程序下载的 xls 文件时,出现此错误: excel cannot open the file because the file format or file extension
有一些相关的概念,即文件指针、流和文件描述符。 我知道文件指针是指向数据类型 FILE 的指针(在例如 FILE.h 和 struct_FILE.h 中声明)。 我知道文件描述符是 int ,例如成员
好吧,这应该很容易... 我是groovy的新手,我希望实现以下逻辑: def testFiles = findAllTestFiles(); 到目前为止,我想出了下面的代码,该代码可以成功打印所有文
我理解为什么以下内容会截断文件的内容: Get-Content | Out-File 这是因为 Out-File 首先运行,它会在 Get-Content 有机会读取文件之前清空文件。 但是当我尝
您好,我正在尝试将文件位置表示为变量,因为最终脚本将在另一台机器上运行。这是我尝试过的代码,然后是我得到的错误。在我看来,python 是如何添加“\”的,这就是导致问题的原因。如果是这种情况,我如何
我有一个只包含一行的输入文件: $ cat input foo bar 我想在我的脚本中使用这一行,据我所知有 3 种方法: line=$(cat input) line=$( input"...,
我是一名优秀的程序员,十分优秀!