- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一个包含如下行的文件(文件名:sample.csv
)
Id,Query
T1012,"Select * from employee_dim limit 100"
T1212,"Select * from department_dim limit 100"
T1231,"Select dept_number,location,dept_name from locations"
我需要遍历此文件 (sample.csv
) 并获取第二列(“query”),在 hive 数据库中运行它并获取结果,然后将其保存到名为 T1012_result.csv
的新文件,并对所有行执行类似操作。
你能帮忙吗?
我尝试通过 spark 读取文件并将其转换为列表,然后使用无效的 sparksession 执行 SQL 查询。
from pyspark.sql import SparkSession,HiveContext
spark=SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql("use sample")
input=spark.read.csv("sample.csv")
#input.select('_c1').show()
import pandas as pd
a=input.toPandas().values.tolist()
for i in a :
print i[1]
spark.sql('pd.DataFrame(i)')
最佳答案
更新:spark
file_path="file:///user/vikrant/inputfiles/multiquery.csv"
df=spark.read.format("com.databricks.spark.csv").option("header", "true").load(file_path)
+---+-------------------------------+
|id |query |
+---+-------------------------------+
|1 |select * from exampledate |
|2 |select * from test |
|3 |select * from newpartitiontable|
+---+-------------------------------+
def customFunction(row):
for row in df.rdd.collect():
item=(row[1])
filename=(row[0])
query=""
query+=str(item)
newdf=spark.sql(query)
savedataframe(newdf,filename)
def savedataframe(newdf,filename):
newdf.coalesce(1).write.csv("/user/dev/hadoop/external/files/file_" + filename + ".csv")
customFunction(df)
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_1.csv
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_2.csv
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_3.csv
更新:使用 Pandas 我在 sql server 上有几个测试表,我正在将它们读入你在问题中提到的 pandas 数据框,并将查询结果保存到每个不同的文件中,并重命名为数据框的第一列:
import pandas as pd
import pyodbc
from pandas import DataFrame
connection = pyodbc.connect('Driver={ODBC Driver 13 for SQL Server};SERVER=yourservername;DATABASE=some_db;UID=username;PWD=password')
cursor = connection.cursor()
data=[['1','select * from User_Stage_Table'],['2','select * from User_temp_Table']]
df=pd.DataFrame(data,columns=['id','query'])
def get_query(df):
a=df.values.tolist()
for i in a:
query=i[1] #reading second column value as query
filename=i[0] #reading first column value as filename
write_query(query,filename) #calling write_query function
def write_query(query,filename):
df=pd.read_sql_query(query,connection)
df.to_csv(outfile_location+filename+".txt",sep=',',encoding='utf-8',index=None,mode='a')
get_query(df) #calling get_query function to build the query
out_file_location='G:\Testing\OutputFile\outfile'
您的输出文件名为:
outfile1.txt
#这将包含表 User_Stage_Table
outfile2.txt
#这将包含表 User_temp_Table'
如果这能解决您的问题或遇到任何进一步的问题,请告诉我。
关于PythonSpark : need to execute hive queries from file columns,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57315590/
以下哪一个更好(EJB 3 JPA) //查询 一个)。 getEntityManager().createQuery("select o from User o"); //命名查询,其中 findA
也许其他人和我有同样的问题。我遇到了错误: Cannot execute queries while other unbuffered queries are active.Consider usin
我的代码 package com.tl666.elasticsearch.pojo; import lombok.AllArgsConstructor; import lombok.Data; imp
简短版:我想查询另一个查询的结果,以便选择更有限的结果集。但是,添加 where 子句会重写第一个查询而不是处理结果,因此我得不到我需要的答案。 详情:我有两个模型,支票和蜱虫。检查 has_many
我正在尝试使用 Doctrine 在 Symfony 框架中执行原始查询。 这是代码: class MessagesHandler { /** @var \Doctrine\Common\Pe
我正在运行以下两个语句: 首先是 A) 它做它需要做的事情并工作: SELECT itemColumn ,valueColumn ,label FROM rstCombinedChartD
我有一个脚本来查询数据库以获取订单信息,然后查询该查询以获取订单总数。代码看起来像这样。 SELECT oi.OrderQty, oi.ItemPrice FROM Ord
这个问题在这里已经有了答案: MySQL Insert query doesn't work with WHERE clause (31 个答案) 关闭 4 年前。 我正在从 php 更新数据库中的
在使用 Skygear JS SDK 时,查询是否返回数组? readDummy: function(){ const Test = skygear.Record.extend('
我想在一个表上运行 MySQL 查询,然后在该表上运行子查询。我有一个对象列表。每个对象都有一个主要版本和一个次要版本。对于一个对象,我试图找到该对象的“最后版本”:这意味着我想找到该对象的最大值(主
我正在尝试在 pod 中启动 prometheus,并在 k8s 中使用持久卷。 当我启动 pod 时,我看到: level=info ts=2021-09-12T13:58:13.120Z ca
基本上,我从 kube-prometheus-stack 安装了 Prometheues-Grafana使用提供的 helm chart repo prometheus-community # hel
是否可以根据另一个查询的结果在 TFS 2010 中创建新查询? 例如,一个(父)查询选择位于某个工作项下的所有工作项(假设 ID=5 的工作项)。现在我想创建其他查询,从第一个查询的结果中选择所有错
在 Delphi 中,每当我使用 TQuery 对数据库执行 SELECT 时,我都会在 Query.Open 后面加上 try..finally,并在finally 部分中使用 Query.Clos
我只是从一台服务器移动到另一台服务器。我的脚本在旧服务器上运行良好,但是我开始在新服务器上收到此错误: "Declaration of ezSQL_mysql::query() should be c
我想问一下有什么区别 for row in session.Query(Model1): pass 和 for row in session.Query(Model1).all():
如何使用注释通过spring-data-elasticsearch进行@Query(value =“{” query“:”“}”)的聚合? 最佳答案 您不能使用@Query注释来完成此操作,该注释的唯
我有一个对可变字符串执行 LIKE 条件的查询: 当变量包含一个包含单引号的单词时,返回一些结果,但不是全部: SELECT ID FROM MyQoQ
我有我的查询范围,它返回数百条记录。我需要在 Controller 中使用不同的过滤器查询这个集合。 我怎样才能做到这一点?可能吗? 查询范围: Client::join('transactions_
我有这样的数据库模式 用户 编号 初中生 文档 编号 标题 user_id(用户的外键) 模式(可以接受 PUBLIC 或 PRIVATE) 我想检索所有公开的文档和属于给定用户(矩阵)的所有文档 我
我是一名优秀的程序员,十分优秀!