Java 22 introduz coletores de fluxo, um novo mecanismo para manipular fluxos de dados. Os coletores de fluxo são o recurso fornecido pelo JEP 461, permitindo que os desenvolvedores criem operadores intermediários personalizados que simplificam operações complexas. À primeira vista, os coletores de fluxo parecem um pouco complexos e obscuros, e você pode se perguntar por que precisa deles. Mas quando você se depara com uma situação que exige um certo tipo de manipulação de stream, os coletores se tornam uma adição óbvia e bem-vinda à API Stream.
A API Stream e coletores de stream
Os fluxos Java modelam coleções dinâmicas de elementos. Como diz a especificação, “Um fluxo é uma sequência de valores potencialmente ilimitada e computada lentamente”.
Isso significa que você pode consumir e operar fluxos de dados indefinidamente. Pense nisso como sentar-se à beira de um rio e observar a água passar. Você nunca pensaria em esperar o rio acabar. Com os riachos, você simplesmente começa a trabalhar com o rio e tudo o que ele contém. Quando terminar, você vai embora.
A API Stream possui vários métodos integrados para trabalhar nos elementos em uma sequência de valores. Estes são os operadores funcionais como filter
e map
.
Na API Stream, os streams começam com uma fonte de eventos e operações como filter
e map
são conhecidas como operações “intermediárias”. Cada operação intermediária retorna o fluxo, para que você possa compô-los juntos. Mas com a API Stream, Java não começará a aplicar nenhuma dessas operações até que o stream atinja uma operação “terminal”. Isto suporta um processamento eficiente mesmo com muitos operadores encadeados.
Os operadores intermediários integrados do Stream são poderosos, mas não conseguem cobrir toda a gama de requisitos imagináveis. Para situações fora da caixa, precisamos de uma maneira de definir operações personalizadas. Os coletores nos dão isso.
O que você pode fazer com coletores de fluxo
Digamos que você está na margem do rio e folhas estão flutuando com números escritos nelas. Se você quiser fazer algo simples, como criar uma matriz de todos os números pares que você vê, você pode usar o built-in filter
método:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
numbers.stream().filter(number -> number % 2 == 0).toArray()
// result: { 2, 4, 6 }
No exemplo acima, começamos com um array de inteiros (a fonte) e depois o transformamos em um stream, aplicando um filtro que retorna apenas aqueles números cuja divisão por dois não deixa resto. O toArray()
call é a chamada do terminal. Isso equivale a verificar a uniformidade de cada folha e colocá-la de lado se passar.
Métodos integrados do Stream Gatherers
A interface java.util.stream.Gatherers vem com algumas funções integradas que permitem criar operações intermediárias personalizadas. Vamos dar uma olhada no que cada um faz.
O método windowFixed
E se você quisesse pegar todas as folhas que flutuam e coletá-las em baldes de duas? Isso é surpreendentemente complicado com operadores funcionais integrados. Requer a transformação de uma matriz de dígitos únicos em uma matriz de matrizes.
O windowFixed
O método é uma maneira mais simples de reunir suas folhas em baldes:
Stream.iterate(0, i -> i + 1)
.gather(Gatherers.windowFixed(2))
.limit(5)
.collect(Collectors.toList());
Isto diz: Dê-me um fluxo baseado na iteração de números inteiros por 1. Transforme cada dois elementos em um novo array. Faça isso cinco vezes. Finalmente, transforme o fluxo em um List
. O resultado é:
((0, 1), (2, 3), (4, 5), (6, 7), (8, 9))
Janelar é como mover um quadro sobre o fluxo; permite tirar fotos.
O método windowSliding
Outra função de janelamento é windowSliding, que funciona como windowFixed()
exceto que cada janela começa no próximo elemento da matriz de origem, e não no final da última janela. Aqui está um exemplo:
Stream.iterate(0, i -> i + 1)
.gather(Gatherers.windowSliding(2))
.limit(5)
.collect(Collectors.toList());
A saída é:
((0, 1), (1, 2), (2, 3), (3, 4), (4, 5))
Compare o windowSliding
saída com a saída de windowFixed
e você verá a diferença. Cada subarray em windowSliding
contém o último elemento do subarray anterior, ao contrário windowFixed
.
O método Gatherers.fold
Gatherers.fold
é como uma versão refinada do método Stream.reduce. É um pouco sutil ver onde fold()
vem a calhar reduce()
. Uma boa discussão é encontrada neste artigo. Aqui está o que o autor, Viktor Klang, tem a dizer sobre as diferenças entre fold
e reduce
:
Dobrar é uma generalização da redução. Com a redução, o tipo de resultado é igual ao tipo de elemento, o combinador é associativo e o valor inicial é uma identidade para o combinador. Para uma dobra, essas condições não são necessárias, embora desistamos da paralelização.
Então vemos isso reduce
é um tipo de fold
. A redução pega um fluxo e o transforma em um único valor. O dobramento também faz isso, mas afrouxa os requisitos: 1) que o tipo de retorno seja do mesmo tipo que os elementos do fluxo; 2) que o combinador é associativo; e 3) que o inicializador em fold
é uma função geradora real, não um valor estático.
O segundo requisito é relevante para a paralelização, que discutirei com mais detalhes em breve. Chamando Stream.parallel
em um stream significa que o mecanismo pode dividir o trabalho em vários threads. Isto só funciona se o operador for associativo; isto é, funciona se a ordem das operações não afetar o resultado.
Aqui está um uso simples de fold
:
Stream.of("hello","world","how","are","you?")
.gather(
Gatherers.fold(() -> "",
(acc, element) -> acc.isEmpty() ? element : acc + "," + element
)
)
.findFirst()
.get();
Este exemplo pega a coleção de strings e as combina com vírgulas. O mesmo trabalho feito por reduce
:
String result = Stream.of("hello", "world", "how", "are", "you?")
.reduce("", (acc, element) -> acc.isEmpty() ? element : acc + "," + element);
Você pode ver isso com fold
você define uma função (() -> “”
) em vez de um valor inicial (“”
). Isso significa que se você precisar de um tratamento mais complexo do iniciador, poderá usar o closure
função.
Agora vamos pensar nas vantagens de fold
com relação a uma diversidade de tipos. Digamos que temos um fluxo de tipos de objetos mistos e queremos contar ocorrências:
var result = Stream.of(1,"hello", true).gather(Gatherers.fold(() -> 0, (acc, el) -> acc + 1));
// result.findFirst().get() = 3
O result var
é 3. Observe que o fluxo tem um número, uma string e um booleano. Realizando um feito semelhante com reduce
é difícil porque o argumento do acumulador (acc
) é fortemente tipado:
// bad, throws exception:
var result = Stream.of(1, "hello", true).reduce(0, (acc, el) -> acc + 1);
// Error: bad operand types for binary operator '+'
Poderíamos usar um collector
para realizar este trabalho:
var result2 = Stream.of("apple", "banana", "apple", "orange")
.collect(Collectors.toMap(word -> word, word -> 1, Integer::sum, HashMap::new));
Mas então perdemos o acesso ao inicializador e ao corpo das funções de dobramento se precisarmos de uma lógica mais envolvida.
O método Gatherers.scan
Digitalizar é algo como windowFixed
mas acumula os elementos em um único elemento em vez de uma matriz. Novamente, um exemplo dá mais clareza (este exemplo é do Javadocs):
Stream.of(1,2,3,4,5,6,7,8,9)
.gather(
Gatherers.scan(() -> "", (string, number) -> string + number)
)
.toList();
A saída é:
("1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789")
Então, scan
nos permite percorrer os elementos do fluxo e combiná-los cumulativamente.
O método mapConcurrent
Com mapConcurrent, você pode especificar um número máximo de threads para usar simultaneamente na execução do map
função fornecida. Threads virtuais serão usados. Aqui está um exemplo simples que limita a simultaneidade a quatro threads enquanto eleva ao quadrado os números (observe que mapConcurrent
é um exagero para um conjunto de dados tão simples):
Stream.of(1,2,3,4,5).gather(Gatherers.mapConcurrent(4, x -> x * x)).collect(Collectors.toList());
// Result: (1, 4, 9, 16, 25)
Além do fio máximo, mapConcurrent
funciona exatamente como o padrão map
função.
Conclusão
Até que os coletores de fluxo sejam promovidos como um recurso, você ainda precisará usar o --enable-preview
bandeira para acessar o Gatherer
interface e seus recursos. Uma maneira fácil de experimentar é usar JShell: $ jshell --enable-preview
.
Embora não sejam uma necessidade diária, os coletores de fluxo preenchem algumas lacunas antigas na API Stream e facilitam para os desenvolvedores estender e personalizar programas Java funcionais.