- xml - AJAX/Jquery XML 解析
- 具有多重继承的 XML 模式
- .net - 枚举序列化 Json 与 XML
- XML 简单类型、简单内容、复杂类型、复杂内容
我最近在玩 Go,想出了一个小脚本来解析日志文件并将它们插入到 Elasticsearch 中。对于每个文件,我都生成了一个这样的 goroutine:
var wg := sync.WaitGroup{}
wg.Add(len(files))
for _, file := range files {
go func(f os.FileInfo){
defer wg.Done()
ProcessFile(f.Name(), config.OriginFilePath, config.WorkingFilePath, config.ArchiveFilePath,fmt.Sprintf("http://%v:%v", config.ElasticSearch.Host, config.ElasticSearch.Port),config.ProviderIndex, config.NetworkData)
}(file)
}
wg.Wait()
在我的 processFile 中,我有发送到 Elasticsearch 的函数:
func BulkInsert(lines []string, ES *elastic.Client) (*elastic.Response, error){
r, err := ES.PerformRequest("POST", "/_bulk", url.Values{}, strings.Join(lines, "\n")+"\n")
if err != nil {
return nil, err
}
return r, nil
}
问题是我不完全理解 goroutines 是如何工作的。我的理解是发送到 Elasticsearch 会阻止我的一个 goroutines 执行。我尝试使用相同的方法生成另一个 goroutine 以使用批量插入进行 Elasticsearch :
WaitGroup
, go func(){defer wg.Done(); BulkInsert(elems, ES);}()
和 wg.Wait()
在我的函数返回之前。然而,我发现最终并不是我所有的事件都在 Elasticsearch 中结束。我认为这是由于 goroutines 在没有发送/等待批量请求完成的情况下返回。
我的问题是,我处理这个问题的方法是否正确?我可以获得更好的性能吗?
最佳答案
Can I achieve better performance?
不清楚,这取决于接收方和发送方的能力。
My question is, is my approach to this problem is correct?
这可能有助于您更好地理解 go routines,
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
)
func main() {
addr := "127.0.0.1:2074"
srv := http.Server{
Addr: addr,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println("hit ", r.URL.String())
<-time.After(time.Second)
log.Println("done ", r.URL.String())
}),
}
fail(unblock(srv.ListenAndServe))
jobs := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
// case 1
// it creates 10 goroutines,
// that triggers 10 // concurrent get queries
{
wg := sync.WaitGroup{}
wg.Add(len(jobs))
log.Printf("starting %v jobs\n", len(jobs))
for _, job := range jobs {
go func(job int) {
defer wg.Done()
http.Get(fmt.Sprintf("http://%v/job/%v", addr, job))
}(job)
}
wg.Wait()
log.Printf("done %v jobs\n", len(jobs))
}
log.Println()
log.Println("=================")
log.Println()
// case 2
// it creates 3 goroutines,
// that triggers 3 // concurrent get queries
{
wg := sync.WaitGroup{}
wg.Add(len(jobs))
in := make(chan string)
limit := make(chan bool, 3)
log.Printf("starting %v jobs\n", len(jobs))
go func() {
for url := range in {
limit <- true
go func(url string) {
defer wg.Done()
http.Get(url)
<-limit
}(url)
}
}()
for _, job := range jobs {
in <- fmt.Sprintf("http://%v/job/%v", addr, job)
}
wg.Wait()
log.Printf("done %v jobs\n", len(jobs))
}
log.Println()
log.Println("=================")
log.Println()
// case 2: rewrite
// it creates 6 goroutines,
// that triggers 6 // concurrent get queries
{
wait, add := parallel(6)
log.Printf("starting %v jobs\n", len(jobs))
for _, job := range jobs {
url := fmt.Sprintf("http://%v/job/%v", addr, job)
add(func() {
http.Get(url)
})
}
wait()
log.Printf("done %v jobs\n", len(jobs))
}
}
func parallel(c int) (func(), func(block func())) {
wg := sync.WaitGroup{}
in := make(chan func())
limit := make(chan bool, c)
go func() {
for block := range in {
limit <- true
go func(block func()) {
defer wg.Done()
block()
<-limit
}(block)
}
}()
return wg.Wait, func(block func()) {
wg.Add(1)
in <- block
}
}
func unblock(block func() error) error {
w := make(chan error)
go func() { w <- block() }()
select {
case err := <-w:
return err
case <-time.After(time.Millisecond):
}
return nil
}
func fail(err error) {
if err != nil {
panic(err)
}
}
输出
$ go run main.go
2017/09/14 01:30:50 starting 10 jobs
2017/09/14 01:30:50 hit /job/0
2017/09/14 01:30:50 hit /job/4
2017/09/14 01:30:50 hit /job/5
2017/09/14 01:30:50 hit /job/2
2017/09/14 01:30:50 hit /job/9
2017/09/14 01:30:50 hit /job/1
2017/09/14 01:30:50 hit /job/3
2017/09/14 01:30:50 hit /job/7
2017/09/14 01:30:50 hit /job/8
2017/09/14 01:30:50 hit /job/6
2017/09/14 01:30:51 done /job/5
2017/09/14 01:30:51 done /job/4
2017/09/14 01:30:51 done /job/2
2017/09/14 01:30:51 done /job/0
2017/09/14 01:30:51 done /job/6
2017/09/14 01:30:51 done /job/9
2017/09/14 01:30:51 done /job/1
2017/09/14 01:30:51 done /job/3
2017/09/14 01:30:51 done /job/7
2017/09/14 01:30:51 done /job/8
2017/09/14 01:30:51 done 10 jobs
2017/09/14 01:30:51
2017/09/14 01:30:51 =================
2017/09/14 01:30:51
2017/09/14 01:30:51 starting 10 jobs
2017/09/14 01:30:51 hit /job/0
2017/09/14 01:30:51 hit /job/2
2017/09/14 01:30:51 hit /job/1
2017/09/14 01:30:52 done /job/2
2017/09/14 01:30:52 done /job/0
2017/09/14 01:30:52 done /job/1
2017/09/14 01:30:52 hit /job/3
2017/09/14 01:30:52 hit /job/4
2017/09/14 01:30:52 hit /job/5
2017/09/14 01:30:53 done /job/3
2017/09/14 01:30:53 done /job/4
2017/09/14 01:30:53 done /job/5
2017/09/14 01:30:53 hit /job/6
2017/09/14 01:30:53 hit /job/7
2017/09/14 01:30:53 hit /job/8
2017/09/14 01:30:54 done /job/6
2017/09/14 01:30:54 done /job/7
2017/09/14 01:30:54 done /job/8
2017/09/14 01:30:54 hit /job/9
2017/09/14 01:30:55 done /job/9
2017/09/14 01:30:55 done 10 jobs
2017/09/14 01:30:55
2017/09/14 01:30:55 =================
2017/09/14 01:30:55
2017/09/14 01:30:55 starting 10 jobs
2017/09/14 01:30:55 hit /job/0
2017/09/14 01:30:55 hit /job/1
2017/09/14 01:30:55 hit /job/4
2017/09/14 01:30:55 hit /job/2
2017/09/14 01:30:55 hit /job/3
2017/09/14 01:30:55 hit /job/5
2017/09/14 01:30:56 done /job/0
2017/09/14 01:30:56 hit /job/6
2017/09/14 01:30:56 done /job/1
2017/09/14 01:30:56 done /job/2
2017/09/14 01:30:56 done /job/4
2017/09/14 01:30:56 hit /job/7
2017/09/14 01:30:56 done /job/3
2017/09/14 01:30:56 hit /job/9
2017/09/14 01:30:56 hit /job/8
2017/09/14 01:30:56 done /job/5
2017/09/14 01:30:57 done /job/6
2017/09/14 01:30:57 done /job/7
2017/09/14 01:30:57 done /job/9
2017/09/14 01:30:57 done /job/8
2017/09/14 01:30:57 done 10 jobs
关于elasticsearch - 并发文件解析并插入到 Elastic Search 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46204606/
我正在尝试将我的应用程序上载到Elastic Beanstalk,但是在节点预gyp安装--fallback-to-build上,npm安装失败。我尝试了各种版本的节点,但无济于事。似乎正在尝试获取一
每当我在 Elastic Beanstalk 中创建新环境时,我都会手动配置自定义 AMI ID、SNS 通知等,但我想自动完成,即,将设置(自定义 AMI ID、SNS、 key 对等)保存到一个配
我已使用以下方法连接到 Elastic Beanstalk: eb ssh XXXXXX --profile=xx 现在我想将一个文件复制到我的本地机器上,我该怎么做? 最佳答案 找出与 scp 一起
对于典型的 Java Web 应用程序,使用 Elastic Beanstalk 相对于手动创建 EC2 实例、设置 tomcat 服务器和部署等有哪些优势?负载平衡、监控和自动缩放是唯一的优势吗?
我有两个Elastic Search版本,一个是 7.3 ,另一个是 7.1 。我正在将flattened数据类型用于 Elastic Search 7.3 ,并且我也想在 Elastic Searc
我是 Elastic 和 spring-data-elastic 的新手。我一直在此处和网络的其他区域进行搜索,但到目前为止尚未找到我的问题的答案。我希望 SO 能够提供帮助。 我正在为我的Users
我有一个运行 PHP 的弹性 beanstalk 环境。在我的项目中,我有一个 .ebextensions 文件夹和一个名为“15-memorymonitor.config”的文件,其中包含以下内容;
我有 “更新”:Dockerrun.aws.json 中的“真” 当我更新 ECR 中的图像时,它应该自动更新 EC2 iontance 中的图像和容器。 但是当我在推送新图像后通过 ssh 进入实例
我有一个定义 Elastic Beanstalk 应用程序的 CloudFormation 模板。 我想扩展这个应用程序,即我希望端口 80 上的监听器重定向到 HTTPS。 AWS::Elastic
我在使用自定义 .ebextensions 文件部署 EB 实例时遇到问题。这是该文件中的相关部分: container_commands: 01_migrate: command: 'p
我已经使用带负载均衡器的 Elastic Beanstalk 创建了一个环境,并在各自的配置中分配了所有健康检查值 我也为ELB设置了应用健康检查url 但是当我检查自动缩放组配置时,健康检查类型是
我想使用 OpenTelemetry 将跟踪/指标数据导出到 Elastic Search,但我更愿意避免使用 Elastic APM。是否可以?opentelemetry 贡献 repo显然暗示这是
我正在尝试部署我的 角申请通过GitHub Actions到 Elastic Beanstalk 。我正在使用这个 GitHub actions用于部署到 ELB。 我的问题是,部署失败,因为 ELB
我已阅读有关 Deploying Versions with Zero Downtime 的 AWS 文档,又名 CNAME 交换。 如 yegor256在 this answer 中有解释: The
我们在我们的一个应用程序服务器上安装了 Elastic 5.6.10 和 HibernateSearch ORM 5.11.4.Final,现在我们计划通过我们的一个微服务(spring boot,但
我正在使用 AWS Elastic beanstalk 并希望为不同的环境配置不同的 ENV 变量。我发现的唯一方法是使用 ebextensions,但如果我将同一个数据包部署到多个环境,则无法覆盖在
我有一个应用程序,其中包含 nodejs 和 php 代码。 nodejs 用于运行应用程序所需的几个脚本。我如何使用 aws Elastic beanstalk 部署此类应用程序? 最佳答案 有两种
我打算将 MP4(1920x1080,比特率可能因 mp4 而异)转换为 HLS(不同类型的分辨率)。 不同类型的分辨率,我正在寻找 1080p = 1920x1080 720p = 1280x720
我不断收到以下消息。但是在我的 nginx 日志中没有任何内容表明返回的请求状态为 5xx。此外,应用程序似乎按预期工作。我可能会得到这些的任何指示? 留言: Environment health h
我们如何使用 bitbucket 管道更新 aws elastic beanstalk 上的 asp.net 核心网站? 最佳答案 我知道这是迟到的答案,但几天前我做了同样的事情,所以这里是我是如何做
我是一名优秀的程序员,十分优秀!