gpt4 book ai didi

spring - 在RxJava Services中管理事务性的正确方法是什么?

转载 作者:行者123 更新时间:2023-12-04 04:10:34 26 4
gpt4 key购买 nike

我最近开始尝试RxJava,并遇到了一位Netflix工程师的演讲,该演讲建议将我们的业务API迁移到Observable API,例如:

public interface VideoService {
Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic);
Observable<VideoBasicInfo> getVideoBasicInfo(Integer videoId);
Observable<VideoRating> getVideoRating(Integer videoId);
}

但是,我还没有找到任何地方可以解释如何在此服务中管理事务性。起初,我只是用 @Transactional注释了我的服务实现
@Service
@Transactional
public class VideoServiceImpl implements VideoService{

@Autowired
private VideoBasicInfoRepository basicInfoRepo;
@Autowired
private VideoRatingRepository ratingRepo;

public Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic){
return Observable.create( s -> {
s.onNext(basicInfoRepo.save(videBasic));
});
}

我们想要的是 Object.create lambda( s -> { // This code })内部的所有代码的执行都在事务中发生。 但是,实际发生的是:
  • createVideoBasicInfo()的调用以事务方式执行,返回可观察到的寒冷。
  • save()作为原子事务执行。

  • 显然,这是有道理的,因为Spring代理适用于serviceImpl方法。我已经想到了一些可以实现自己真正期望的方法,例如启动程序化事务:
    return Observable.create( s -> {
    VideoBasicInfo savedBasic = transactionTemplate.execute( status -> {
    VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo);
    return basicInfo;
    });
    s.onNext(savedBasic);
    });

    在使用响应式(Reactive)API时,这是推荐的管理事务的方法吗?

    最佳答案

    Spring Data JpaRepository方法签名已经标记为@Transactional,因此,如果仅使用一个,则无需执行任何特殊操作:

    public interface PersonRepository extends JpaRepository<Person, Integer> {
    }
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
    public class PersonRepositoryTest {
    private PersonRepository personRepository;

    @Autowired
    public void setPersonRepository(PersonRepository PersonRepository) {
    this.personRepository = PersonRepository;
    }

    @Test
    public void testReactiveSavePerson() {
    Person person = new Person("Jane", "Doe");
    assertNull(person.getId()); //null before save

    //save person
    Observable.create(s -> {
    s.onNext(personRepository.save(person));
    }).subscribe();

    //fetch from DB
    Person fetchedPerson = personRepository.findOne(person.getId());

    //should not be null
    assertNotNull(fetchedPerson);

    //should equal
    assertEquals(person.getId(), fetchedPerson.getId());
    assertEquals(person.getFirstName(), fetchedPerson.getFirstName());
    }
    }

    如果需要将多个存储库合并到一个事务中,则可以使用下面的类:
    @Component()
    public class ObservableTxFactory {
    public final <T> Observable<T> create(Observable.OnSubscribe<T> f) {
    return new ObservableTx<>(this, f);
    }

    @Transactional
    public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) {
    onSubscribe.call(subscriber);
    }

    private static class ObservableTx<T> extends Observable<T> {

    public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe<T> f) {
    super(new OnSubscribeDecorator<>(observableTxFactory, f));
    }
    }

    private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> {

    private final ObservableTxFactory observableTxFactory;
    private final Observable.OnSubscribe<T> onSubscribe;

    OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe<T> s) {
    this.onSubscribe = s;
    this.observableTxFactory = observableTxFactory;
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
    observableTxFactory.call(onSubscribe, subscriber);
    }
    }
    }

    还需要定义工厂bean:
    @Bean
    ObservableTxFactory observableTxFactory() {
    return new ObservableTxFactory();
    }

    服务:
    @Service
    public class PersonService {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ObservableTxFactory observableTxFactory;

    public Observable<Person> createPerson(String firstName, String lastName) {
    return observableTxFactory.create(s -> {
    Person p = new Person(firstName, lastName);
    s.onNext(personRepository.save(p));
    });
    }
    }

    测试:
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
    public class PersonServiceTest {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ObservableTxFactory observableTxFactory;

    @Test
    public void testPersonService() {
    final PersonService service = new PersonService();
    service.personRepository = personRepository;
    service.observableTxFactory = observableTxFactory;

    final Observable<Person> personObservable = service.createPerson("John", "Doe");
    personObservable.subscribe();

    //fetch from DB
    final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false)
    .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe"))
    .findFirst()
    .get();

    //should not be null
    assertNotNull(fetchedPerson);
    }

    }

    屏幕截图显示了代理:
    enter image description here

    关于spring - 在RxJava Services中管理事务性的正确方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34250741/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com