@RequestScoped
public class ManagedService {
@Resource
private ManagedExecutorService executor;
public CompletableFuture<Integer> asyncTask(final int value) {
return CompletableFuture
.supplyAsync(longTask(value, 100, null), executor)
.thenApply(i -> i + 1);
}
public CompletableFuture<Integer> asyncTaskWithException(final int value) {
return CompletableFuture
.supplyAsync(longTask(value, 100, "Planned exception"), executor)
.thenApply(i -> i + 1);
}
private Supplier<Integer> longTask(final int value,
final int taskDurationMs,
final String errorMessage) {
return () -> {
if (nonNull(errorMessage)) {
throw new RuntimeException(errorMessage);
}
try {
TimeUnit.MILLISECONDS.sleep(taskDurationMs);
} catch (InterruptedException e) {
throw new RuntimeException("Problem while waiting");
}
return value + 1;
};
}
}
Utilitário de Concorrência para o Java EE
Neste exemplo será usado o Utilitário de Concorrência para Java EE, ou JSR 236.
Esse padrão permite que os desenvolvedores de aplicativos usem utilitários de concorrência gerenciados pelo servidor de aplicação. Dessa forma, o desenvolvedor não tem mais a responsabilidade de gerenciar manualmente pesquisas de thread ou threads. Além disso, em um objeto de thread não gerenciado, o contêiner não pode garantir que outros serviços da plataforma Java EE funcionem corretamente. Por esses motivos, é recomendável o uso de threads gerenciadas sempre que a necessidade surgir. Mais informações podem ser encontradas aqui.
Principais Componentes do Utilitário de Concorrência
O padrão especifica os componentes principais do utilitário de concorrência. Em suma, esses componentes são objetos gerenciados que oferecem facilidades de concorrência. Esses objetos, uma vez que são gerenciados pelo aplicativo, podem ser injetados usando CDI ou JNDI. Mais informações podem ser encontradas aqui.
ManagedExecutorService
Um ManagedExecutorService
é um objeto que permite ao desenvolvedor do aplicativo enviar tarefas de forma assíncrona. As tarefas são executadas em threads que são gerenciadas pelo contêiner.
Exemplo
Aqui está a classe que usa um ManagedExecutorService
(código completo pode ser encontrado aqui):
O objeto ManagedExecutorService
, está sendo gerenciado, é injetado usando a anotação @Resource
.
Esse exemplo simula uma computação de longa execução, definida no método longTask
.
As capacidades do ManagedExecutorService
são exemplificadas nos métodos asyncTask
e asyncTaskWithException
.
Os dois métodos invocam o método longTask`definido acima; cada execução do `longTask
é feita em uma thread gerenciada pela aplicação.
O método asyncTask
simula uma execução bem sucedida, enquanto o asyncTaskWithException
simula uma execução que vai lançar uma exceção.
Os métodos são usados na seguinte classe de teste (exemplo completo pode ser encontrado aqui):
@RunWith(Arquillian.class)
public class ManagedServiceTest {
@Inject
private ManagedService managedService;
@Deployment()
public static final WebArchive app() {
return ShrinkWrap.create(WebArchive.class, "example.war")
.addClasses(ManagedService.class)
.addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
}
@Test
public void managedInvocationTest() {
final CompletableFuture<Integer> future = managedService.asyncTask(1);
try {
assertEquals(3, future.get(200, TimeUnit.MILLISECONDS).intValue());
} catch (Exception e) {
fail("Unexpected exception" + e);
}
}
@Test(expected = TimeoutException.class)
public void managedInvocationTestWithTimeout() throws InterruptedException, ExecutionException, TimeoutException {
final CompletableFuture<Integer> future = managedService.asyncTask(1);
future.get(10, TimeUnit.MILLISECONDS);
}
@Test
public void managedInvocationTestWithException() {
final CompletableFuture<Integer> future = managedService.asyncTaskWithException(1);
try {
future.get(200, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
assertEquals("Planned exception", e.getCause().getMessage());
} catch (Exception e) {
fail("Unexpected exception" + e);
}
}
}
ManagedScheduledExecutorService
Um ManagedScheduledExecutorService
é um objeto que permite que os desenvolvedores executem tarefas assíncronas em momentos específicos. As tarefas são executadas em threads iniciadas pelo contêiner.
Exemplo
Exemplo completo pode ser encontrado aqui:
@RequestScoped
public class ManagedScheduledService {
@Resource
private ManagedScheduledExecutorService executor;
public Future<Integer> singleFixedDelayTask(final int value,
final String errorMessage) {
return executor.schedule(
longCallableTask(value, 10, errorMessage), 100, TimeUnit.MILLISECONDS);
}
public ScheduledFuture<?> periodicFixedDelayTask(final int value,
final String errorMessage,
final CountDownLatch countDownLatch) {
return executor.scheduleAtFixedRate(
longRunnableTask(value, 10, errorMessage, countDownLatch), 0, 100, TimeUnit.MILLISECONDS);
}
private Runnable longRunnableTask(final int value,
final int taskDurationMs,
final String errorMessage,
final CountDownLatch countDownLatch) {
return () -> {
failOrWait(taskDurationMs, errorMessage);
Integer result = value + 1;
countDownLatch.countDown();
};
}
private Callable<Integer> longCallableTask(final int value,
final int taskDurationMs,
final String errorMessage) {
return () -> {
failOrWait(taskDurationMs, errorMessage);
return value + 1;
};
}
private void failOrWait(final int taskDurationMs,
final String errorMessage) {
if (nonNull(errorMessage)) {
throw new RuntimeException(errorMessage);
}
try {
TimeUnit.MILLISECONDS.sleep(taskDurationMs);
} catch (InterruptedException e) {
throw new RuntimeException("Problem while waiting");
}
}
}
Esse exemplo também define um método, longCallableTask
, simulando a execução de uma computação de longa duração.
O método singleFixedDelayTask
agenda uma tarefa de longa duração (chamando longCallableTask
), mas a execução vai iniciar depois de 100 ms.
O método periodicFixedDelayTask
agenda tarefas para serem executadas periodicamente, após cada 100 ms, com um delay inicial de 0.
Os metódos são usados nas seguintes classes de teste (código completo pode ser encontrado aqui):
@RunWith(Arquillian.class)
public class ManagedScheduledServiceTest {
@Inject
private ManagedScheduledService scheduledService;
@Deployment()
public static final WebArchive app() {
return ShrinkWrap.create(WebArchive.class, "example.war")
.addClasses(ManagedScheduledService.class)
.addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
}
@Test
public void singleFixedDelayTask() throws InterruptedException, ExecutionException, TimeoutException {
final Future<Integer> futureA = scheduledService.singleFixedDelayTask(1, null);
final Future<Integer> futureB = scheduledService.singleFixedDelayTask(50, null);
assertEquals(2, futureA.get(200, TimeUnit.MILLISECONDS).intValue());
assertEquals(51, futureB.get(200, TimeUnit.MILLISECONDS).intValue());
}
@Test
public void periodicFixedDelayTask() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(4); // execute 4 times
final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, null, countDownLatch);
countDownLatch.await(500, TimeUnit.MILLISECONDS);
if (!scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
}
@Test
public void singleFixedDelayTaskWithException() {
final Future<Integer> future = scheduledService.singleFixedDelayTask(1, "Planned exception");
try {
future.get(200, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
assertEquals("Planned exception", e.getCause().getMessage());
} catch (Exception e) {
fail("Unexpected exception" + e);
}
}
@Test
public void periodicFixedDelayTaskWithException() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, "Planned exception", countDownLatch);
try {
countDownLatch.await(200, TimeUnit.MILLISECONDS);
scheduledFuture.get(200, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
assertEquals("Planned exception", e.getCause().getMessage());
} catch (Exception e) {
fail("Unexpected exception" + e);
}
if (!scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
}
}
ManagedThreadFactory
Um ManagedThreadFactory
é um objeto que permite aos desenvolvedores criar threads gerenciadas por contêiner.
Exemplo
Exemplo completo pode ser encontrado aqui:
@RequestScoped
public class ThreadFactoryService {
@Resource
private ManagedThreadFactory factory;
public void asyncTask(final LongTask longTask) throws InterruptedException {
final Thread thread = factory.newThread(longTask);
thread.setName("pretty asyncTask");
thread.start();
}
public void asyncHangingTask(final Runnable longTask) {
final Thread thread = factory.newThread(longTask);
thread.setName("pretty asyncHangingTask");
thread.start();
if (thread.isAlive()) {
thread.interrupt();
}
}
public static class LongTask implements Runnable {
private final int value;
private final long taskDurationMs;
private final CountDownLatch countDownLatch;
private int result;
private AtomicBoolean isTerminated = new AtomicBoolean(false);
public LongTask(final int value,
final long taskDurationMs,
final CountDownLatch countDownLatch) {
this.value = value;
this.taskDurationMs = taskDurationMs;
this.countDownLatch = countDownLatch;
}
public int getResult() {
return result;
}
public boolean getIsTerminated() {
return isTerminated.get();
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(taskDurationMs);
} catch (InterruptedException e) {
isTerminated.set(true);
countDownLatch.countDown();
throw new RuntimeException("Problem while waiting");
}
result = value + 1;
countDownLatch.countDown();
}
}
}
Esse exemplo define uma classe que implementa Runnable
, executando uma tarefa de longa duração no método run
.
O método asyncTask
apenas cria uma thread gerenciada (usando o ManagedThreadFactory
injetado) em seguida, a inicia.
O método asyncHangingTask
também cria uma thread gerenciada, a inicia, mas também a para.
A seguinte classe testa esses métodos (código completo pode ser encontrado aqui):
@RunWith(Arquillian.class)
public class ThreadFactoryServiceTest {
@Inject
private ThreadFactoryService factoryService;
@Deployment()
public static final WebArchive app() {
return ShrinkWrap.create(WebArchive.class, "example.war")
.addClasses(ThreadFactoryService.class)
.addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
}
@Test
public void asyncTask() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final LongTask longTask = new LongTask(1, 50, countDownLatch);
factoryService.asyncTask(longTask);
countDownLatch.await(200, TimeUnit.MILLISECONDS);
assertEquals(2, longTask.getResult());
}
@Test
public void asyncHangingTask() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final LongTask longTask = new LongTask(1, 1000000, countDownLatch);
factoryService.asyncHangingTask(longTask);
countDownLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue(longTask.getIsTerminated());
}
}
O exemplo completo do projeto pode ser encontrado aqui.
É um projeto Maven, e todos os testes podem ser rodados executando o comando mvn clean install
.