- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我有 Spring Boot 2 网络应用程序,我需要在其中通过 cookie 识别网站访问者并收集页面查看统计信息。所以我需要拦截每个网络请求。我必须编写的代码比回调 hell 更复杂(Spring reactor 应该解决的问题)。
代码如下:
package mypack.conf;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import org.springframework.http.HttpCookie;
import org.springframework.http.ResponseCookie;
import org.springframework.web.reactive.config.ResourceHandlerRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import mypack.dao.PageViewRepository;
import mypack.dao.UserRepository;
import mypack.domain.PageView;
import mypack.domain.User;
import mypack.security.JwtProvider;
import reactor.core.publisher.Mono;
@Configuration
@ComponentScan(basePackages = "mypack")
@EnableReactiveMongoRepositories(basePackages = "mypack")
public class WebConfig implements WebFluxConfigurer {
@Autowired
@Lazy
private UserRepository userRepository;
@Autowired
@Lazy
private PageViewRepository pageViewRepository;
@Autowired
@Lazy
JwtProvider jwtProvider;
@Bean
public WebFilter sampleWebFilter() {
return new WebFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String uri = exchange.getRequest().getURI().toString();
String path = exchange.getRequest().getPath().pathWithinApplication().value();
HttpCookie cookie = null;
String token = "";
Map<String, List<HttpCookie>> cookies = exchange.getRequest().getCookies();
try {
if((exchange.getRequest().getCookies().containsKey("_token") )
&& (exchange.getRequest().getCookies().getFirst("_token"))!=null ) {
cookie = exchange.getRequest().getCookies().getFirst("_token");
token = cookie.getValue();
return userRepository.findByToken(token).map(user -> {
exchange.getAttributes().put("_token", user.getToken());
PageView pg = PageView.builder().createdDate(LocalDateTime.now()).URL(uri).build();
pageViewRepository.save(pg).subscribe(pg1 -> {user.getPageviews().add(pg1); });
userRepository.save(user).subscribe();
return user;
})
.flatMap(user-> chain.filter(exchange)); // ultimately this step executes regardless user exist or not
// handle case when brand new user first time visits website
} else {
token = jwtProvider.genToken("guest", UUID.randomUUID().toString());
User user = User.builder().createdDate(LocalDateTime.now()).token(token).emailId("guest").build();
userRepository.save(user).subscribe();
exchange.getResponse().getCookies().remove("_token");
ResponseCookie rcookie = ResponseCookie.from("_token", token).httpOnly(true).build();
exchange.getResponse().addCookie(rcookie);
exchange.getAttributes().put("_token", token);
}
} catch (Exception e) {
e.printStackTrace();
}
return chain.filter(exchange);
} // end of Mono<Void> filter method
}; // end of New WebFilter (anonymous class)
}
}
其他相关类:
@Repository
public interface PageViewRepository extends ReactiveMongoRepository<PageView, String>{
Mono<PageView> findById(String id);
}
@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String>{
Mono<User> findByToken(String token);
}
@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class User {
@Id
private String id;
private String token;
@Default
private LocalDateTime createdDate = LocalDateTime.now();
@DBRef
private List<PageView> pageviews;
}
Data
@Document
@Builder
public class PageView {
@Id
private String id;
private String URL;
@Default
private LocalDateTime createdDate = LocalDateTime.now();
}
gradle文件相关部分:
buildscript {
ext {
springBootVersion = '2.0.1.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
compile('org.springframework.boot:spring-boot-starter-security')
compile('org.springframework.boot:spring-boot-starter-thymeleaf')
compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.security:spring-security-oauth2-client')
compile('org.springframework.security.oauth:spring-security-oauth2:2.3.4.RELEASE')
runtime('org.springframework.boot:spring-boot-devtools')
compileOnly('org.projectlombok:lombok')
compile "org.springframework.security:spring-security-jwt:1.0.9.RELEASE"
compile "io.jsonwebtoken:jjwt:0.9.0"
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('io.projectreactor:reactor-test')
compile('com.fasterxml.jackson.core:jackson-databind')
}
问题出在以下几行:
PageView pg = PageView.builder().createdDate(LocalDateTime.now()).URL(uri).build(); pageViewRepository.save(pg).subscribe(pg1 -> {user.getPageviews().add(pg1); });
挂起浏览器(一直等待响应)。
基本上我想要的是这样的:不得使用甚至在 webfilter 代码中都不起作用的 block() 作为 block 挂起浏览器。将浏览量保存在 mongo db 中。保存后,pageview 具有有效的 mongodb id,需要将其作为引用存储在用户实体的 pageviews 列表中。因此只有保存到数据库后,下一步就是更新用户的浏览量列表。下一步是在不影响下游 Controller 方法的情况下保存更新的用户,这些方法也可能更新用户并且可能也需要保存用户。所有这些都应该在给定的 WebFilter 上下文中工作。
如何解决这个问题?
提供的解决方案必须确保在传递给 Controller 操作之前将用户保存在 webfilter 中,其中一些操作还使用来自查询字符串参数的不同值保存用户。
最佳答案
如果我理解正确,您需要对数据库异步执行长时间操作以防止过滤器(以及请求本身)阻塞?
在这种情况下,我会推荐以下适合我的解决方案:
@Bean
public WebFilter filter() {
return (exchange, chain) -> {
ServerHttpRequest req = exchange.getRequest();
String uri = req.getURI().toString();
log.info("[i] Got request: {}", uri);
var headers = req.getHeaders();
List<String> tokenList = headers.get("token");
if (tokenList != null && tokenList.get(0) != null) {
String token = tokenList.get(0);
log.info("[i] Find a user by token {}", token);
return userRepo.findByToken(token)
.map(user -> process(exchange, uri, token, user))
.then(chain.filter(exchange));
} else {
String token = UUID.randomUUID().toString();
log.info("[i] Create a new user with token {}", token);
return userRepo.save(new User(token))
.map(user -> process(exchange, uri, token, user))
.then(chain.filter(exchange));
}
};
}
在这里,我稍微更改了您的逻辑,并从适当的 header (而不是 cookie)中获取 token 值,以简化我的实现。
因此,如果 token 存在,那么我们会尝试找到它的用户。如果 token 不存在,那么我们创建一个新用户。如果成功找到或创建用户,则 process
方法正在调用。之后,无论结果如何,我们都返回 chain.filter(exchange)
.
方法process
将 token 值放入请求的适当属性并异步调用方法updateUserStat
的userService
:
private User process(ServerWebExchange exchange, String uri, String token, User user) {
exchange.getAttributes().put("_token", token);
userService.updateUserStat(uri, user); // async call
return user;
}
用户服务:
@Slf4j
@Service
public class UserService {
private final UserRepo userRepo;
private final PageViewRepo pageViewRepo;
public UserService(UserRepo userRepo, PageViewRepo pageViewRepo) {
this.userRepo = userRepo;
this.pageViewRepo = pageViewRepo;
}
@SneakyThrows
@Async
public void updateUserStat(String uri, User user) {
log.info("[i] Start updating...");
Thread.sleep(1000);
pageViewRepo.save(new PageView(uri))
.flatMap(user::addPageView)
.blockOptional()
.ifPresent(u -> userRepo.save(u).block());
log.info("[i] User updated.");
}
}
出于测试目的,我在此处添加了一个小延迟,以确保无论此方法的持续时间如何,请求都能毫无延迟地工作。
用户被token发现的情况:
2019-01-06 18:25:15.442 INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=1000
2019-01-06 18:25:15.443 INFO 4992 --- [ctor-http-nio-3] : [i] Find a user by token 84b0f7ec-670c-4c04-8a7c-b692752d7cfa
2019-01-06 18:25:15.444 DEBUG 4992 --- [ctor-http-nio-3] : Created query Query: { "token" : "84b0f7ec-670c-4c04-8a7c-b692752d7cfa" }, Fields: { }, Sort: { }
2019-01-06 18:25:15.445 DEBUG 4992 --- [ctor-http-nio-3] : find using query: { "token" : "84b0f7ec-670c-4c04-8a7c-b692752d7cfa" } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:25:15.457 INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:25:15.457 INFO 4992 --- [ task-3] : [i] Start updating...
2019-01-06 18:25:15.458 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:25:16.459 DEBUG 4992 --- [ task-3] : Inserting Document containing fields: [URL, createdDate, _class] in collection: pageView
2019-01-06 18:25:16.476 DEBUG 4992 --- [ task-3] : Saving Document containing fields: [_id, token, pageViews, _class]
2019-01-06 18:25:16.479 INFO 4992 --- [ task-3] : [i] User updated.
这里我们可以看到更新用户是在独立的task-3
中进行的用户已经获得“获取所有用户”请求的结果之后的线程。
token 不存在且用户已创建的情况:
2019-01-06 18:33:54.764 INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=763
2019-01-06 18:33:54.764 INFO 4992 --- [ctor-http-nio-3] : [i] Create a new user with token d9bd40ea-b869-49c2-940e-83f1bf79e922
2019-01-06 18:33:54.765 DEBUG 4992 --- [ctor-http-nio-3] : Inserting Document containing fields: [token, _class] in collection: user
2019-01-06 18:33:54.776 INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:33:54.777 INFO 4992 --- [ task-4] : [i] Start updating...
2019-01-06 18:33:54.777 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:33:55.778 DEBUG 4992 --- [ task-4] : Inserting Document containing fields: [URL, createdDate, _class] in collection: pageView
2019-01-06 18:33:55.792 DEBUG 4992 --- [ task-4] : Saving Document containing fields: [_id, token, pageViews, _class]
2019-01-06 18:33:55.795 INFO 4992 --- [ task-4] : [i] User updated.
token 存在但找不到用户的情况:
2019-01-06 18:35:40.970 INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=150
2019-01-06 18:35:40.970 INFO 4992 --- [ctor-http-nio-3] : [i] Find a user by token 184b0f7ec-670c-4c04-8a7c-b692752d7cfa
2019-01-06 18:35:40.972 DEBUG 4992 --- [ctor-http-nio-3] : Created query Query: { "token" : "184b0f7ec-670c-4c04-8a7c-b692752d7cfa" }, Fields: { }, Sort: { }
2019-01-06 18:35:40.972 DEBUG 4992 --- [ctor-http-nio-3] : find using query: { "token" : "184b0f7ec-670c-4c04-8a7c-b692752d7cfa" } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:35:40.977 INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:35:40.978 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user
我的演示项目:sb-reactive-filter-demo
关于spring - 如何在 Spring Reactor Web 应用程序中执行一系列操作并确保一个操作在下一个操作之前完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53651090/
我正在尝试找出我们应该在下一个项目中使用 Akka 还是 Reactor。最重要的问题之一是 future 选择的框架是否会提供远程处理。正如我所看到的,Akka 以我们想要的方式提供了这个。 在 G
假设我有一个返回 Mono 的 repository.save(..) 方法。 还可以说我有一个 repository.findByEmail(..) 它返回一个 Mono。 问题: 我希望第一个 M
有一个像下面这样的异步发布者,Project Reactor 有没有办法等到整个流完成处理? 当然,不必添加一个未知持续时间的 sleep ...... @Test public void group
我创建了一个简单的 Kafka 消费者,它返回 Flux对象(收到的消息),我正在尝试使用 StepVerifier 对其进行测试. 在我的测试中,我做了这样的事情: Flux flux = cons
我目前正在研究resilience4j 库,由于某种原因,以下代码无法按预期工作: @Test public void testRateLimiterProjectReactor() { //
关闭。这个问题是opinion-based 。目前不接受答案。 想要改进这个问题吗?更新问题,以便 editing this post 可以用事实和引文来回答它。 . 已关闭 7 年前。 Improv
当多个 onErrorContinue添加到管道 处理从 flatMap 抛出的特定类型的异常 ,异常处理没有按预期工作。 我希望下面的代码应该删除元素 1 到 6,而订阅者应该使用元素 7 到 10
我有一个 NettyServerCustomizer,下一个代码: @Override public HttpServer apply(final HttpServer httpServer)
我们正在评估 reactor 库以便在我们的项目中使用它。我们的项目得到了 spring 上下文的支持。因此,我们需要一个工具来构建具有 spring 支持的事件驱动应用程序。 此外,我们的主要关注领
不管上游的完整性如何,有没有办法强制 groupBy() 生成的 Flux 在一段时间后完成(或类似地,限制“打开”组的最大数量)?我有如下内容: Flux someFastPublisher; so
我正在使用 Spring WebClient 调用休息服务。如下所述的 post 调用代码。 Mono response = client.post()
我希望在 react 器运行后添加更多协议(protocol)和工厂。我找不到说明这是允许的文档。当我在 reactor.connectTCP 之前运行 reactor.run 时,程序会在工厂中围绕
(译者加)本文档的一些典型的名词如下: Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅 n.)、subscribe(订阅 v.)。 event/signa
Project Reactor Mono 是否有运算符或一些好的方法来实现 doOnEmpty() 的行为? 我想对操作结果产生副作用(日志记录)。 这是我现在拥有的: myMono .map(v
在以下两个示例中,处理通量流的行为似乎有所不同。 示例 1: public static void main(String[] args) throws InterruptedException {
是否可以使用 LoggingMeterRegistry 以微米为单位收集项目 react 器的通量指标? 最佳答案 只需添加 LoggingMeterRegistry 就可以了到全局 Micromet
我想知道在spring web-flux中使用先前映射结果的好方法,例如 Mono.just(request) ... .flatMap(object0 -> createObject1(object
如何将具有1个元素的助焊剂转换为单声道? Flux.fromArray(arrayOf(1,2,1,1,1,2)) .distinct()
我想知道Reactor和分页的HTTP API。我有一个private fun getPage(pageNumber: Int): Mono。该资源具有“numberOfPages”字段,我想获取所有
我有一个包含多个 URL 和端口的数组。对于他们每个人,我需要发送和接收返回的内容: Flux.fromArray(trackersArray) .flatMap(tracker ->
我是一名优秀的程序员,十分优秀!