Preloader image

Neste exemplo será usado o Utilitário de Concorrência para Java EE, ou JSR 236.

Esse padrão permite que os desenvolvedores de aplicativos usem utilitários de concorrência gerenciados pelo servidor de aplicação. Dessa forma, o desenvolvedor não tem mais a responsabilidade de gerenciar manualmente pesquisas de thread ou threads. Além disso, em um objeto de thread não gerenciado, o contêiner não pode garantir que outros serviços da plataforma Java EE funcionem corretamente. Por esses motivos, é recomendável o uso de threads gerenciadas sempre que a necessidade surgir. Mais informações podem ser encontradas aqui.

Principais Componentes do Utilitário de Concorrência

O padrão especifica os componentes principais do utilitário de concorrência. Em suma, esses componentes são objetos gerenciados que oferecem facilidades de concorrência. Esses objetos, uma vez que são gerenciados pelo aplicativo, podem ser injetados usando CDI ou JNDI. Mais informações podem ser encontradas aqui.

ManagedExecutorService

Um ManagedExecutorService é um objeto que permite ao desenvolvedor do aplicativo enviar tarefas de forma assíncrona. As tarefas são executadas em threads que são gerenciadas pelo contêiner.

Exemplo

Aqui está a classe que usa um ManagedExecutorService (código completo pode ser encontrado aqui):

@RequestScoped
public class ManagedService {

    @Resource
    private ManagedExecutorService executor;

    public CompletableFuture<Integer> asyncTask(final int value) {
        return CompletableFuture
                .supplyAsync(longTask(value, 100, null), executor)
                .thenApply(i -> i + 1);
    }

    public CompletableFuture<Integer> asyncTaskWithException(final int value) {
        return CompletableFuture
                .supplyAsync(longTask(value, 100, "Planned exception"), executor)
                .thenApply(i -> i + 1);
    }

    private Supplier<Integer> longTask(final int value,
                                       final int taskDurationMs,
                                       final String errorMessage) {
        return () -> {
            if (nonNull(errorMessage)) {
                throw new RuntimeException(errorMessage);
            }

            try {
                TimeUnit.MILLISECONDS.sleep(taskDurationMs);
            } catch (InterruptedException e) {
                throw new RuntimeException("Problem while waiting");
            }
            return value + 1;
        };
    }

}

O objeto ManagedExecutorService, está sendo gerenciado, é injetado usando a anotação @Resource.

Esse exemplo simula uma computação de longa execução, definida no método longTask.

As capacidades do ManagedExecutorService são exemplificadas nos métodos asyncTask e asyncTaskWithException. Os dois métodos invocam o método longTask`definido acima; cada execução do `longTask é feita em uma thread gerenciada pela aplicação. O método asyncTask simula uma execução bem sucedida, enquanto o asyncTaskWithException simula uma execução que vai lançar uma exceção.

Os métodos são usados na seguinte classe de teste (exemplo completo pode ser encontrado aqui):

@RunWith(Arquillian.class)
public class ManagedServiceTest {

    @Inject
    private ManagedService managedService;

    @Deployment()
    public static final WebArchive app() {
        return ShrinkWrap.create(WebArchive.class, "example.war")
                .addClasses(ManagedService.class)
                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
    }

    @Test
    public void managedInvocationTest() {
        final CompletableFuture<Integer> future = managedService.asyncTask(1);
        try {
            assertEquals(3, future.get(200, TimeUnit.MILLISECONDS).intValue());
        } catch (Exception e) {
            fail("Unexpected exception" + e);
        }
    }

    @Test(expected = TimeoutException.class)
    public void managedInvocationTestWithTimeout() throws InterruptedException, ExecutionException, TimeoutException {
        final CompletableFuture<Integer> future = managedService.asyncTask(1);
        future.get(10, TimeUnit.MILLISECONDS);
    }

    @Test
    public void managedInvocationTestWithException() {
        final CompletableFuture<Integer> future = managedService.asyncTaskWithException(1);

        try {
            future.get(200, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            assertEquals("Planned exception", e.getCause().getMessage());
        } catch (Exception e) {
            fail("Unexpected exception" + e);
        }
    }
}

ManagedScheduledExecutorService

Um ManagedScheduledExecutorService é um objeto que permite que os desenvolvedores executem tarefas assíncronas em momentos específicos. As tarefas são executadas em threads iniciadas pelo contêiner.

Exemplo

Exemplo completo pode ser encontrado aqui:

@RequestScoped
public class ManagedScheduledService {

    @Resource
    private ManagedScheduledExecutorService executor;

    public Future<Integer> singleFixedDelayTask(final int value,
                                                final String errorMessage) {
        return executor.schedule(
                longCallableTask(value, 10, errorMessage), 100, TimeUnit.MILLISECONDS);
    }

    public ScheduledFuture<?> periodicFixedDelayTask(final int value,
                                                     final String errorMessage,
                                                     final CountDownLatch countDownLatch) {
        return executor.scheduleAtFixedRate(
                longRunnableTask(value, 10, errorMessage, countDownLatch), 0, 100, TimeUnit.MILLISECONDS);
    }

    private Runnable longRunnableTask(final int value,
                                      final int taskDurationMs,
                                      final String errorMessage,
                                      final CountDownLatch countDownLatch) {
        return () -> {
            failOrWait(taskDurationMs, errorMessage);
            Integer result = value + 1;
            countDownLatch.countDown();
        };
    }

    private Callable<Integer> longCallableTask(final int value,
                                               final int taskDurationMs,
                                               final String errorMessage) {
        return () -> {
            failOrWait(taskDurationMs, errorMessage);
            return value + 1;
        };
    }

    private void failOrWait(final int taskDurationMs,
                            final String errorMessage) {
        if (nonNull(errorMessage)) {
            throw new RuntimeException(errorMessage);
        }
        try {
            TimeUnit.MILLISECONDS.sleep(taskDurationMs);
        } catch (InterruptedException e) {
            throw new RuntimeException("Problem while waiting");
        }
    }

}

Esse exemplo também define um método, longCallableTask, simulando a execução de uma computação de longa duração.

O método singleFixedDelayTask agenda uma tarefa de longa duração (chamando longCallableTask), mas a execução vai iniciar depois de 100 ms. O método periodicFixedDelayTask agenda tarefas para serem executadas periodicamente, após cada 100 ms, com um delay inicial de 0.

Os metódos são usados nas seguintes classes de teste (código completo pode ser encontrado aqui):

@RunWith(Arquillian.class)
public class ManagedScheduledServiceTest {

    @Inject
    private ManagedScheduledService scheduledService;

    @Deployment()
    public static final WebArchive app() {
        return ShrinkWrap.create(WebArchive.class, "example.war")
                .addClasses(ManagedScheduledService.class)
                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
    }

    @Test
    public void singleFixedDelayTask() throws InterruptedException, ExecutionException, TimeoutException {
        final Future<Integer> futureA = scheduledService.singleFixedDelayTask(1, null);
        final Future<Integer> futureB = scheduledService.singleFixedDelayTask(50, null);

        assertEquals(2, futureA.get(200, TimeUnit.MILLISECONDS).intValue());
        assertEquals(51, futureB.get(200, TimeUnit.MILLISECONDS).intValue());

    }

    @Test
    public void periodicFixedDelayTask() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(4); // execute 4 times
        final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, null, countDownLatch);
        countDownLatch.await(500, TimeUnit.MILLISECONDS);
        if (!scheduledFuture.isCancelled()) {
            scheduledFuture.cancel(true);
        }
    }

    @Test
    public void singleFixedDelayTaskWithException() {
        final Future<Integer> future = scheduledService.singleFixedDelayTask(1, "Planned exception");
        try {
            future.get(200, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            assertEquals("Planned exception", e.getCause().getMessage());
        } catch (Exception e) {
            fail("Unexpected exception" + e);
        }
    }

    @Test
    public void periodicFixedDelayTaskWithException() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, "Planned exception", countDownLatch);

        try {
            countDownLatch.await(200, TimeUnit.MILLISECONDS);
            scheduledFuture.get(200, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            assertEquals("Planned exception", e.getCause().getMessage());
        } catch (Exception e) {
            fail("Unexpected exception" + e);
        }

        if (!scheduledFuture.isCancelled()) {
            scheduledFuture.cancel(true);
        }
    }

}

ManagedThreadFactory

Um ManagedThreadFactory é um objeto que permite aos desenvolvedores criar threads gerenciadas por contêiner.

Exemplo

Exemplo completo pode ser encontrado aqui:

@RequestScoped
public class ThreadFactoryService {

    @Resource
    private ManagedThreadFactory factory;

    public void asyncTask(final LongTask longTask) throws InterruptedException {
        final Thread thread = factory.newThread(longTask);
        thread.setName("pretty asyncTask");
        thread.start();
    }

    public void asyncHangingTask(final Runnable longTask) {
        final Thread thread = factory.newThread(longTask);
        thread.setName("pretty asyncHangingTask");
        thread.start();

        if (thread.isAlive()) {
            thread.interrupt();
        }
    }

    public static class LongTask implements Runnable {
        private final int value;
        private final long taskDurationMs;
        private final CountDownLatch countDownLatch;
        private int result;
        private AtomicBoolean isTerminated = new AtomicBoolean(false);

        public LongTask(final int value,
                        final long taskDurationMs,
                        final CountDownLatch countDownLatch) {
            this.value = value;
            this.taskDurationMs = taskDurationMs;
            this.countDownLatch = countDownLatch;
        }

        public int getResult() {
            return result;
        }

        public boolean getIsTerminated() {
            return isTerminated.get();
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(taskDurationMs);
            } catch (InterruptedException e) {
                isTerminated.set(true);
                countDownLatch.countDown();
                throw new RuntimeException("Problem while waiting");
            }

            result = value + 1;
            countDownLatch.countDown();
        }
    }
}

Esse exemplo define uma classe que implementa Runnable, executando uma tarefa de longa duração no método run.

O método asyncTask apenas cria uma thread gerenciada (usando o ManagedThreadFactory injetado) em seguida, a inicia. O método asyncHangingTask também cria uma thread gerenciada, a inicia, mas também a para.

A seguinte classe testa esses métodos (código completo pode ser encontrado aqui):

@RunWith(Arquillian.class)
public class ThreadFactoryServiceTest {

    @Inject
    private ThreadFactoryService factoryService;

    @Deployment()
    public static final WebArchive app() {
        return ShrinkWrap.create(WebArchive.class, "example.war")
                .addClasses(ThreadFactoryService.class)
                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
    }

    @Test
    public void asyncTask() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final LongTask longTask = new LongTask(1, 50, countDownLatch);
        factoryService.asyncTask(longTask);

        countDownLatch.await(200, TimeUnit.MILLISECONDS);

        assertEquals(2, longTask.getResult());
    }

    @Test
    public void asyncHangingTask() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final LongTask longTask = new LongTask(1, 1000000, countDownLatch);

        factoryService.asyncHangingTask(longTask);

        countDownLatch.await(200, TimeUnit.MILLISECONDS);

        assertTrue(longTask.getIsTerminated());
    }
}

O exemplo completo do projeto pode ser encontrado aqui. É um projeto Maven, e todos os testes podem ser rodados executando o comando mvn clean install.