- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章logrus hook输出日志到本地磁盘的操作由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和activemq等中间件去,甚至可以输出到你的email和叮叮中去,不要问为为什么可以发现可以输入到叮叮中去,都是泪,手动笑哭! 。
言归正传,这里就简单的通过hook机制将文件输出到本地磁盘.
首先 。
go get github.com/sirupsen/logrus 。
然后 。
logrus和go lib里面一样有6个等级,可以直接调用 。
1
2
3
4
5
6
|
logrus.Debug("Useful debugging information.")
logrus.Info("Something noteworthy happened!")
logrus.Warn("You should probably take a look at this.")
logrus.Error("Something failed but I'm not quitting.")
logrus.Fatal("Bye.") //log之后会调用os.Exit(1)
logrus.Panic("I'm bailing.") //log之后会panic()
|
项目例子结构 。
main.go 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"logT/logS"
)
func main() {
//创建一个hook,将日志存储路径输入进去
hook := logS.NewHook("d:/log/golog.log")
//加载hook之前打印日志
logrus.WithField("file", "d:/log/golog.log").Info("New logrus hook err.")
logrus.AddHook(hook)
//加载hook之后打印日志
logrus.WithFields(logrus.Fields{
"animal": "walrus",
}).Info("A walrus appears")
}
|
hook.go 。
不要看下面三个go文件代码很长,其实大多数都是固定代码,也就NewHook函数自己扩展定义就好 。
package logS 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
import (
"fmt"
"github.com/sirupsen/logrus"
"os"
"strings"
)
// Hook 写文件的Logrus Hook
type Hook struct {
W LoggerInterface
}
func NewHook(file string) (f *Hook) {
w := NewFileWriter()
config := fmt.Sprintf(`{"filename":"%s","maxdays":7}`, file)
err := w.Init(config)
if err != nil {
return nil
}
return &Hook{w}
}
// Fire 实现Hook的Fire接口
func (hook *Hook) Fire(entry *logrus.Entry) (err error) {
message, err := getMessage(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err)
return err
}
switch entry.Level {
case logrus.PanicLevel:
fallthrough
case logrus.FatalLevel:
fallthrough
case logrus.ErrorLevel:
return hook.W.WriteMsg(fmt.Sprintf("[ERROR] %s", message), LevelError)
case logrus.WarnLevel:
return hook.W.WriteMsg(fmt.Sprintf("[WARN] %s", message), LevelWarn)
case logrus.InfoLevel:
return hook.W.WriteMsg(fmt.Sprintf("[INFO] %s", message), LevelInfo)
case logrus.DebugLevel:
return hook.W.WriteMsg(fmt.Sprintf("[DEBUG] %s", message), LevelDebug)
default:
return nil
}
}
// Levels 实现Hook的Levels接口
func (hook *Hook) Levels() []logrus.Level {
return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
}
}
func getMessage(entry *logrus.Entry) (message string, err error) {
message = message + fmt.Sprintf("%s ", entry.Message)
file, lineNumber := GetCallerIgnoringLogMulti(2)
if file != "" {
sep := fmt.Sprintf("%s/src/", os.Getenv("GOPATH"))
fileName := strings.Split(file, sep)
if len(fileName) >= 2 {
file = fileName[1]
}
}
message = fmt.Sprintf("%s:%d ", file, lineNumber) + message
for k, v := range entry.Data {
message = message + fmt.Sprintf("%v:%v ", k, v)
}
return
}
|
caller.go 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package logS
import (
"runtime"
"strings"
)
func GetCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
// bump by 1 to ignore the getCaller (this) stackframe
callDepth++
outer:
for {
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
break
}
for _, s := range suffixesToIgnore {
if strings.HasSuffix(file, s) {
callDepth++
continue outer
}
}
break
}
return
}
// GetCallerIgnoringLogMulti TODO
func GetCallerIgnoringLogMulti(callDepth int) (string, int) {
// the +1 is to ignore this (getCallerIgnoringLogMulti) frame
return GetCaller(callDepth+1, "logrus/hooks.go", "logrus/entry.go", "logrus/logger.go", "logrus/exported.go", "asm_amd64.s")
}
|
file.go 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
|
package logS
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// RFC5424 log message levels.
const (
LevelError = iota
LevelWarn
LevelInfo
LevelDebug
)
// LoggerInterface Logger接口
type LoggerInterface interface {
Init(config string) error
WriteMsg(msg string, level int) error
Destroy()
Flush()
}
// LogWriter implements LoggerInterface.
// It writes messages by lines limit, file size limit, or time frequency.
type LogWriter struct {
*log.Logger
mw *MuxWriter
// The opened file
Filename string `json:"filename"`
Maxlines int `json:"maxlines"`
maxlinesCurlines int
// Rotate at size
Maxsize int `json:"maxsize"`
maxsizeCursize int
// Rotate daily
Daily bool `json:"daily"`
Maxdays int64 `json:"maxdays"`
dailyOpendate int
Rotate bool `json:"rotate"`
startLock sync.Mutex // Only one log can write to the file
Level int `json:"level"`
}
// MuxWriter an *os.File writer with locker.
type MuxWriter struct {
sync.Mutex
fd *os.File
}
// write to os.File.
func (l *MuxWriter) Write(b []byte) (int, error) {
l.Lock()
defer l.Unlock()
return l.fd.Write(b)
}
// SetFd set os.File in writer.
func (l *MuxWriter) SetFd(fd *os.File) {
if l.fd != nil {
_ = l.fd.Close()
}
l.fd = fd
}
// NewFileWriter create a FileLogWriter returning as LoggerInterface.
func NewFileWriter() LoggerInterface {
w := &LogWriter{
Filename: "",
Maxlines: 1000000,
Maxsize: 1 << 28, //256 MB
Daily: true,
Maxdays: 7,
Rotate: true,
Level: LevelDebug,
}
// use MuxWriter instead direct use os.File for lock write when rotate
w.mw = new(MuxWriter)
// set MuxWriter as Logger's io.Writer
w.Logger = log.New(w.mw, "", log.Ldate|log.Ltime)
return w
}
// Init file logger with json config.
// jsonconfig like:
// {
// "filename":"logs/sample.log",
// "maxlines":10000,
// "maxsize":1<<30,
// "daily":true,
// "maxdays":15,
// "rotate":true
// }
func (w *LogWriter) Init(jsonconfig string) error {
err := json.Unmarshal([]byte(jsonconfig), w)
if err != nil {
return err
}
if len(w.Filename) == 0 {
return errors.New("jsonconfig must have filename")
}
err = w.startLogger()
return err
}
// start file logger. create log file and set to locker-inside file writer.
func (w *LogWriter) startLogger() error {
fd, err := w.createLogFile()
if err != nil {
return err
}
w.mw.SetFd(fd)
err = w.initFd()
if err != nil {
return err
}
return nil
}
func (w *LogWriter) docheck(size int) {
w.startLock.Lock()
defer w.startLock.Unlock()
if w.Rotate && ((w.Maxlines > 0 && w.maxlinesCurlines >= w.Maxlines) ||
(w.Maxsize > 0 && w.maxsizeCursize >= w.Maxsize) ||
(w.Daily && time.Now().Day() != w.dailyOpendate)) {
if err := w.DoRotate(); err != nil {
fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
return
}
}
w.maxlinesCurlines++
w.maxsizeCursize += size
}
// WriteMsg write logger message into file.
func (w *LogWriter) WriteMsg(msg string, level int) error {
if level > w.Level {
return nil
}
n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [T] "
w.docheck(n)
w.Logger.Print(msg)
return nil
}
func (w *LogWriter) createLogFile() (*os.File, error) {
// Open the log file
fd, err := os.OpenFile(w.Filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0660)
return fd, err
}
func (w *LogWriter) initFd() error {
fd := w.mw.fd
finfo, err := fd.Stat()
if err != nil {
return fmt.Errorf("get stat err: %s", err)
}
w.maxsizeCursize = int(finfo.Size())
w.dailyOpendate = time.Now().Day()
if finfo.Size() > 0 {
content, err := ioutil.ReadFile(w.Filename)
if err != nil {
return err
}
w.maxlinesCurlines = len(strings.Split(string(content), "\n"))
} else {
w.maxlinesCurlines = 0
}
return nil
}
// DoRotate means it need to write file in new file.
// new file name like xx.log.2013-01-01.2
func (w *LogWriter) DoRotate() error {
_, err := os.Lstat(w.Filename)
if err == nil { // file exists
// Find the next available number
num := 1
fname := ""
for ; err == nil && num <= 999; num++ {
fname = w.Filename + fmt.Sprintf(".%s.%03d", time.Now().Format("2006-01-02"), num)
_, err = os.Lstat(fname)
}
// return error if the last file checked still existed
if err == nil {
return fmt.Errorf("Rotate: Cannot find free log number to rename %s", w.Filename)
}
// block Logger's io.Writer
w.mw.Lock()
defer w.mw.Unlock()
fd := w.mw.fd
_ = fd.Close()
// close fd before rename
// Rename the file to its newfound home
err = os.Rename(w.Filename, fname)
if err != nil {
return fmt.Errorf("Rotate: %s", err)
}
// re-start logger
err = w.startLogger()
if err != nil {
return fmt.Errorf("Rotate StartLogger: %s", err)
}
go w.deleteOldLog()
}
return nil
}
func (w *LogWriter) deleteOldLog() {
dir := filepath.Dir(w.Filename)
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) (returnErr error) {
defer func() {
if r := recover(); r != nil {
returnErr = fmt.Errorf("Unable to delete old log '%s', error: %+v", path, r)
fmt.Println(returnErr)
}
}()
if !info.IsDir() && info.ModTime().Unix() < (time.Now().Unix()-60*60*24*w.Maxdays) {
if strings.HasPrefix(filepath.Base(path), filepath.Base(w.Filename)) {
_ = os.Remove(path)
}
}
return
})
}
// Destroy destroy file logger, close file writer.
func (w *LogWriter) Destroy() {
_ = w.mw.fd.Close()
}
// Flush file logger.
// there are no buffering messages in file logger in memory.
// flush file means sync file from disk.
func (w *LogWriter) Flush() {
_ = w.mw.fd.Sync()
}
|
补充知识:golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook 。
logrus Hook 分析 。
logrus hook 接口定义很简单。如下 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package logrus
// A hook to be fired when logging on the logging levels returned from
// `Levels()` on your implementation of the interface. Note that this is not
// fired in a goroutine or a channel with workers, you should handle such
// functionality yourself if your call is non-blocking and you don't wish for
// the logging calls for levels returned from `Levels()` to block.
type Hook interface {
Levels() []Level
Fire(*Entry) error
}
// Internal type for storing the hooks on a logger instance.
type LevelHooks map[Level][]Hook
// Add a hook to an instance of logger. This is called with
// `log.Hooks.Add(new(MyHook))` where `MyHook` implements the `Hook` interface.
func (hooks LevelHooks) Add(hook Hook) {
for _, level := range hook.Levels() {
hooks[level] = append(hooks[level], hook)
}
}
// Fire all the hooks for the passed level. Used by `entry.log` to fire
// appropriate hooks for a log entry.
func (hooks LevelHooks) Fire(level Level, entry *Entry) error {
for _, hook := range hooks[level] {
if err := hook.Fire(entry); err != nil {
return err
}
}
return nil
}
|
只需实现 该结构的接口.
1
2
3
4
|
type Hook interface {
Levels() []Level
Fire(*Entry) error
}
|
就会被logrus框架遍历调用已注册的 hook 的 Fire 方法 。
获取日志实例 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
// log_hook.go
package logger
import (
"fmt"
"github.com/sirupsen/logrus"
"library/util/constant"
"os"
)
//自实现 logrus hook
func getLogger(module string) *logrus.Logger {
//实例化
logger := logrus.New()
//设置输出
logger.Out = os.Stdout
//设置日志级别
logger.SetLevel(logrus.DebugLevel)
//设置日志格式
//自定writer就行, hook 交给 lfshook
logger.AddHook(newLogrusHook(constant.GetLogPath(), module))
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat:"2006-01-02 15:04:05",
})
return logger
}
//确保每次调用使用的文件都是唯一的。
func GetNewFieldLoggerContext(module,appField string) *logrus.Entry {
logger:= getLogger(module)
return logger.WithFields(logrus.Fields{
"app": appField,
})
}
//订阅 警告日志
func SubscribeLog(entry *logrus.Entry, subMap SubscribeMap) {
logger := entry.Logger
logger.AddHook(newSubScribeHook(subMap))
fmt.Println("日志订阅成功")
}
|
constant.GetLogPath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可.
日志切片hook 。
代码 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
// writer.go
package logger
import (
"fmt"
"github.com/pkg/errors"
"io"
"library/util"
"os"
"path/filepath"
"sync"
"time"
)
type LogWriter struct {
logDir string //日志根目录地址。
module string //模块 名
curFileName string //当前被指定的filename
curBaseFileName string //在使用中的file
turnCateDuration time.Duration
mutex sync.RWMutex
outFh *os.File
}
func (w *LogWriter) Write(p []byte) (n int, err error) {
w.mutex.Lock()
defer w.mutex.Unlock()
if out, err:= w.getWriter(); err!=nil {
return 0, errors.New("failed to fetch target io.Writer")
}else{
return out.Write(p)
}
}
func (w *LogWriter) getFileName() string {
base := time.Now().Truncate(w.turnCateDuration)
return fmt.Sprintf("%s/%s/%s_%s", w.logDir, base.Format("2006-01-02"), w.module, base.Format("15"))
}
func (w *LogWriter) getWriter()(io.Writer, error) {
fileName := w.curBaseFileName
//判断是否有新的文件名
//会出现新的文件名
baseFileName := w.getFileName()
if baseFileName != fileName {
fileName = baseFileName
}
dirname := filepath.Dir(fileName)
if err := os.MkdirAll(dirname, 0755); err != nil {
return nil, errors.Wrapf(err, "failed to create directory %s", dirname)
}
fileHandler, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Errorf("failed to open file %s", err)
}
w.outFh.Close()
w.outFh = fileHandler
w.curBaseFileName = fileName
w.curFileName = fileName
return fileHandler, nil
}
func New(logPath, module string, duration time.Duration) *LogWriter {
return &LogWriter{
logDir: logPath,
module: module,
turnCateDuration:duration,
curFileName: "",
curBaseFileName: "",
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// hook.go
package logger
import (
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
"time"
)
func newLogrusHook(logPath, moduel string) logrus.Hook {
logrus.SetLevel(logrus.WarnLevel)
writer := New(logPath, moduel, time.Hour * 2)
lfsHook := lfshook.NewHook(lfshook.WriterMap{
logrus.DebugLevel: writer,
logrus.InfoLevel: writer,
logrus.WarnLevel: writer,
logrus.ErrorLevel: writer,
logrus.FatalLevel: writer,
logrus.PanicLevel: writer,
}, &logrus.TextFormatter{DisableColors: true})
// writer 生成新的log文件类型 writer 在通过new hook函数 消费 fire 函数
// writer 是实现了writer 接口的库,在日志调用write是做预处理
return lfsHook
}
|
测试代码 。
1
2
3
4
|
func TestGetLogger(t *testing.T) {
lg := GetNewFieldLoggerContext("test","d")
lg.Logger.Info("????")
}
|
解析 。
logger实例持有了 自定义的 io.writer 结构体,在消费Fire函数时,会调用Write方法,此时通过Truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件.
注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分.
邮件警报hook 。
代码 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// subscribeHook.go
package logger
import (
"fmt"
"github.com/sirupsen/logrus"
"library/email"
"strings"
)
type SubscribeMap map[logrus.Level][]*email.Receiver
type SubscribeHook struct {
subMap SubscribeMap
}
//此处可以自实现hook 目前使用三方hook
func(h *SubscribeHook)Levels() []logrus.Level{
return logrus.AllLevels
}
func(h *SubscribeHook)Fire(entry *logrus.Entry) error{
for level, receivers := range h.subMap {
//命中 准备消费
if level == entry.Level {
if len(receivers) > 0 {
email.SendEmail(receivers, fmt.Sprintf("%s:[系统日志警报]", entry.Level.String()),
fmt.Sprintf("错误内容: %s",entry.Message))
}
}
}
return nil
}
func NewSubscribeMap(level logrus.Level, receiverStr string) SubscribeMap{
subMap := SubscribeMap{}
addressList := strings.Split(receiverStr,";")
var receivers []*email.Receiver
for _, address := range addressList {
receivers = append(receivers, &email.Receiver{Email: address})
}
subMap[level] = receivers
return subMap
}
func newSubScribeHook(subMap SubscribeMap) *SubscribeHook {
return &SubscribeHook{subMap}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
// email.go
package email
import (
"fmt"
"gopkg.in/gomail.v2"
"regexp"
"strconv"
)
type Sender struct {
User string
Password string
Host string
Port int
MailTo []string
Subject string
Content string
}
type Receiver struct {
Email string
}
func (r *Receiver) Check() bool {
pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱
reg := regexp.MustCompile(pattern)
return reg.MatchString(r.Email)
}
func (s *Sender) clean (){
}
//检查 邮箱正确性
func (s *Sender)NewReceiver(email string) *Receiver {
rec := &Receiver{Email:email}
if rec.Check() {
m.MailTo = []string{email}
return rec
}else{
fmt.Printf("email check fail 【%s】\n", email)
return nil
}
}
func (s *Sender)NewReceivers(receivers []*Receiver) {
for _, rec := range receivers {
if rec.Check() {
m.MailTo = append(m.MailTo, rec.Email)
}else{
fmt.Printf("email check fail 【%s】\n", rec.Email)
}
}
}
// 163邮箱 password 为开启smtp后给的秘钥
var m = Sender{User:"6666666@163.com", Password:"666666666", Host: "smtp.163.com", Port: 465}
func SendEmail(receivers []*Receiver,subject, content string){
m.NewReceivers(receivers)
m.Subject = subject
m.Content = content
e := gomail.NewMessage()
e.SetHeader("From", e.FormatAddress(m.User, "hengsheng"))
e.SetHeader("To", m.MailTo...) //发送给多个用户
e.SetHeader("Subject", m.Subject) //设置邮件主题
e.SetBody("text/html", m.Content) //设置邮件正文
d := gomail.NewDialer(m.Host, m.Port, m.User, m.Password)
err := d.DialAndSend(e)
if err != nil {
fmt.Printf("error 邮件发送错误! %s \n", err.Error())
}
}
|
使用 。
同理在writer时 如果是错误日志则发送邮件.
1
2
3
4
5
|
o.logger = logger.GetNewFieldLoggerContext("test", "666")
if subscribeSocket {
logger.SubscribeLog(o.Logger, logger.NewSubscribeMap(logrus.ErrorLevel, "a@163.com;b@163.com"))
}
// o 为实际结构体实例
|
kafkahook 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// kafka hook
package logger
import (
"github.com/sirupsen/logrus"
"library/kafka"
"library/util/constant"
)
type KafKaHook struct {
kafkaProducer *kafka.KafkaProducer
}
func(h *KafKaHook)Levels() []logrus.Level{
return logrus.AllLevels
}
func(h *KafKaHook)Fire(entry *logrus.Entry) error{
h.kafkaProducer.SendMsgSync(entry.Message)
return nil
}
func newKafkaHook() *KafKaHook{
producer := kafka.NewKafkaProducer(constant.KafkaLogElkTopic,true)
return &KafKaHook{kafkaProducer: producer}
}
|
使用时logger.AddHook(newKafkaHook()) 即可 。
kafka模块 。
生产者 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
// kafkaProducer.go
package kafka
import (
"errors"
"fmt"
"github.com/Shopify/sarama"
"library/util/constant"
"log"
"time"
)
func GetKafkaAddress()[]string{
return "127.0.0.1:9092"
}
//同步消息模式
func SyncProducer(topic, message string) error {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
if err != nil {
return errors.New(fmt.Sprintf("sarama.NewSyncProducer err, message=%s \n", err))
}
defer p.Close()
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(message),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
return errors.New(fmt.Sprintf("send sdsds err=%s \n", err))
} else {
fmt.Printf("发送成功,partition=%d, offset=%d \n", part, offset)
return nil
}
}
//async 异步生产者
type KafkaProducer struct {
topic string
asyncProducer *sarama.AsyncProducer
syncProducer *sarama.SyncProducer
sync bool
}
func NewKafkaProducer(topic string, sync bool) *KafkaProducer {
k := &KafkaProducer{
topic: topic,
sync: sync,
}
if sync {
k.initSync()
}else{
k.initAsync()
}
return k
}
func (k *KafkaProducer) initAsync() bool {
if k.sync {
fmt.Printf("sync producer cant call async func !\n")
return false
}
config := sarama.NewConfig()
//等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
//随机向partition发送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
config.Version = sarama.V0_10_0_1
producer, e := sarama.NewAsyncProducer(GetKafkaAddress(), config)
if e != nil {
fmt.Println(e)
return false
}
k.asyncProducer = &producer
defer producer.AsyncClose()
pd := *k.asyncProducer
go func() {
for{
select {
case <-pd.Successes():
//fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
case fail := <-pd.Errors():
fmt.Printf("err: %s \n", fail.Err.Error())
}
}
}()
return true
}
func (k *KafkaProducer) initSync() bool {
if !k.sync {
fmt.Println("async producer cant call sync func !")
return false
}
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
k.syncProducer = &p
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return false
}
return true
}
func (k *KafkaProducer) SendMsgAsync(sendStr string) {
msg := &sarama.ProducerMessage{
Topic: k.topic,
}
//将字符串转化为字节数组
msg.Value = sarama.ByteEncoder(sendStr)
//fmt.Println(value)
//使用通道发送
pd := *k.asyncProducer
pd.Input() <- msg
}
func (k *KafkaProducer) SendMsgSync(sendStr string) bool {
msg := &sarama.ProducerMessage{
Topic: k.topic,
Value: sarama.ByteEncoder(sendStr),
}
pd := *k.syncProducer
part, offset, err := pd.SendMessage(msg)
if err != nil {
fmt.Printf("发送失败 send message(%s) err=%s \n", sendStr, err)
return false
} else {
fmt.Printf("发送成功 partition=%d, offset=%d \n", part, offset)
return true
}
}
|
调用 SendMsgSync 或 SendMsgAsync 生产消息,注意初始化时的参数要保证一致! 。
消费者组 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
// kafkaConsumerGroup.go
package kafka
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"log"
"sync"
)
func NewKafkaConsumerGroup(topics []string, group string, businessCall func(message *sarama.ConsumerMessage) bool) *KafkaConsumerGroup {
k := &KafkaConsumerGroup{
brokers: GetKafkaAddress(),
topics: topics,
group: group,
channelBufferSize: 2,
ready: make(chan bool),
version: "1.1.1",
handler: businessCall,
}
k.Init()
return k
}
// 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,
// 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个
// Consumer 消费,但可以被多个 consumer group 消费
type KafkaConsumerGroup struct {
//代理(broker): 一台kafka服务器称之为一个broker
brokers []string
//主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
topics []string
version string
ready chan bool
group string
channelBufferSize int
//业务调用
handler func(message *sarama.ConsumerMessage) bool
}
func (k *KafkaConsumerGroup)Init() func() {
version,err := sarama.ParseKafkaVersion(k.version)
if err!=nil{
fmt.Printf("Error parsing Kafka version: %v", err)
}
cfg := sarama.NewConfig()
cfg.Version = version
// 分区分配策略
cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
// 未找到组消费位移的时候从哪边开始消费
cfg.Consumer.Offsets.Initial = -2
// channel长度
cfg.ChannelBufferSize = k.channelBufferSize
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(k.brokers, k.group, cfg)
if err != nil {
fmt.Printf("Error creating consumer group client: %v", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
//util.HandlePanic("client.Consume panic", log.StandardLogger())
}()
for {
if err := client.Consume(ctx, k.topics, k); err != nil {
log.Printf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
log.Println(ctx.Err())
return
}
k.ready = make(chan bool)
}
}()
<-k.ready
fmt.Printf("Sarama consumer up and running!... \n")
// 保证在系统退出时,通道里面的消息被消费
return func() {
cancel()
wg.Wait()
if err = client.Close(); err != nil {
fmt.Printf("Error closing client: %v \n", err)
}
}
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(k.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
// 具体消费消息
for message := range claim.Messages() {
//msg := string(message.Value)
//k.logger.Infof("卡夫卡: %s", msg)
if ok:= k.handler(message); ok {
// 更新位移
session.MarkMessage(message, "")
}
//run.Run(msg)
}
return nil
}
|
测试代码 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func TestKafkaConsumerGroup_Init(t *testing.T) {
//pd := NewKafkaProducer("test-fail",true)
//pd.InitSync()
k := NewKafkaConsumerGroup([]string{constant.KafkaALiSdkTopic}, "group-2", func(message *sarama.ConsumerMessage) bool {
fmt.Println(string(message.Value))
//如果失败的处理逻辑
//if ok := pd.SendMsgSync("666666"); ok {
// return true
//}
return false
})
consumerDone := k.Init()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
fmt.Println("terminating: via signal")
}
consumerDone()
}
|
这里有一些补偿逻辑在里面.
以上就是logrus相关hook.
好了,这篇logrus hook输出日志到本地磁盘的操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我.
原文链接:https://noshoes.blog.csdn.net/article/details/82909121 。
最后此篇关于logrus hook输出日志到本地磁盘的操作的文章就讲到这里了,如果你想了解更多关于logrus hook输出日志到本地磁盘的操作的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在使用 OUTFILE 命令,但由于权限问题和安全风险,我想将 shell 的输出转储到文件中,但出现了一些错误。我试过的 #This is a simple shell to connect t
我刚刚开始学习 Java,我想克服在尝试为这个“问题”创建 Java 程序时出现的障碍。这是我必须创建一个程序来解决的问题: Tandy 喜欢分发糖果,但只有 n 颗糖果。对于她给第 i 个糖果的人,
你好,我想知道我是否可以得到一些帮助来解决我在 C++ 中打印出 vector 内容的问题 我试图以特定顺序在一个或两个函数调用中输出一个类的所有变量。但是我在遍历 vector 时收到一个奇怪的错误
我正在将 intellij (2019.1.1) 用于 java gradle (5.4.1) 项目,并使用 lombok (1.18.6) 来自动生成代码。 Intellij 将生成的源放在 out
编辑:在与 guest271314 交流后,我意识到问题的措辞(在我的问题正文中)可能具有误导性。我保留了旧版本并更好地改写了新版本 背景: 从远程服务器获取 JSON 时,响应 header 包含一
我的问题可能有点令人困惑。我遇到的问题是我正在使用来自 Java 的 StoredProcedureCall 调用过程,例如: StoredProcedureCall call = new Store
在我使用的一些IDL中,我注意到在方法中标记返回值有2个约定-[in, out]和[out, retval]。 当存在多个返回值时,似乎使用了[in, out],例如: HRESULT MyMetho
当我查看 gar -h 的帮助输出时,它告诉我: [...] gar: supported targets: elf64-x86-64 elf32-i386 a.out-i386-linux [...
我想循环遍历一个列表,并以 HTML 格式打印其中的一部分,以代码格式打印其中的一部分。所以更准确地说:我想产生与这相同的输出 1 is a great number 2 is a great
我有下面的tekton管道,并尝试在Google Cloud上运行。集群角色绑定。集群角色。该服务帐户具有以下权限。。例外。不确定需要为服务帐户设置什么权限。
当尝试从 make 过滤非常长的输出以获取特定警告或错误消息时,第一个想法是这样的: $ make | grep -i 'warning: someone set up us the bomb' 然而
我正在创建一个抽象工具类,该类对另一组外部类(不受我控制)进行操作。外部类在某些接口(interface)点概念上相似,但访问它们相似属性的语法不同。它们还具有不同的语法来应用工具操作的结果。我创建了
这个问题已经有答案了: What do numbers starting with 0 mean in python? (9 个回答) 已关闭 7 年前。 在我的代码中使用按位与运算符 (&) 时,我
我写了这段代码来解析输入文件中的行输入格式:电影 ID 可以有多个条目,所以我们应该计算平均值输出:**没有重复(这是问题所在) import re f = open("ratings2.txt",
我需要处理超过 1000 万个光谱数据集。数据结构如下:大约有 1000 个 .fits(.fits 是某种数据存储格式)文件,每个文件包含大约 600-1000 个光谱,其中每个光谱中有大约 450
我编写了一个简单的 C 程序,它读取一个文件并生成一个包含每个单词及其出现频率的表格。 该程序有效,我已经能够在 Linux 上运行的终端中获得显示的输出,但是,我不确定如何获得生成的显示以生成包含词
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
1.普通的输出: print(str)#str是任意一个字符串,数字··· 2.格式化输出: ?
我无法让 logstash 正常工作。 Basic logstash Example作品。但后来我与 Advanced Pipeline Example 作斗争.也许这也可能是 Elasticsear
这是我想要做的: 我想让用户给我的程序一些声音数据(通过麦克风输入),然后保持 250 毫秒,然后通过扬声器输出。 我已经使用 Java Sound API 做到了这一点。问题是它有点慢。从发出声音到
我是一名优秀的程序员,十分优秀!