current position:Home>Java8 stream is so powerful. Do you know its principle

Java8 stream is so powerful. Do you know its principle

2022-01-27 01:42:25 It Maple fighter

brief introduction : I am a Maple brother , First line Internet IT Migrant workers Senior interviewers Java Founder of flea network . With many years of front-line R & D experience , Master Java, Proficient in data structures and algorithms , Master SSM, Master springboot, Master springcloud Other framework , Proficient distributed , High concurrency and payment modules .

Scan the QR code on the left , Join the group chat , Learning together , Progress together !

Welcome to thumb up Collection Leaving a message.

: Welfare at the end of the article


1.Stream Composition and characteristics of .

2.BaseStream Interface .

3.Stream Interface .

4. Turn off flow operation .

5. Parallel streams and serial streams .

6.Work Stealing principle :

7. from ForkJoinPool From the perspective of ParallelStream.

Summary :

8. Performance of parallel streams .

9.NQ Model .

10. Order encountered .

Java8API Added a new abstraction , It's called a stream Stream, Allows you to process data declaratively .

Stream Use a similar SQL Statement provides an intuitive way to query data from a database Java Higher order abstraction of set operation and expression .

Stream API Can be greatly improved Java Programmer productivity , Let programmers write efficient 、 clean 、 Simple code .

This article will analyze Stream Implementation principle of .

1.Stream Composition and characteristics of .

Stream( flow ) Is a queue of elements from a data source , Support aggregation operations :

  • An element is an object of a particular type , Form a line .Java Medium Stream Elements are not stored and managed like collections , It's on demand .

  • The source of a data stream can be a collection Collection、 Array Aray、I/Ochanel、 generator generator etc. .

  • similar SQL Aggregate operation of statement , Such as filter、map、reduce、find、match、sorted etc. .

And the old Collection Different operation , Stream There are two basic features of operation :

  • Pipelining: The intermediate operation will return the stream object itself . In this way, multiple operations can be connected in series into a pipe , Just like streaming style . This optimizes the operation , Such as delayed execution (laziness evaluation) And short circuit (short-circuiting)

  • Internal iteration : I used to pass Iterator or For-Each Explicitly iterate over the collection traversal , It's called external iteration .Stream Through visitor mode (Visitor) Provide internal iterations .

Unlike iterators ,Stream You can do it in parallel , Iterators can only perform command and serialization operations . seeing the name of a thing one thinks of its function , When traversing in serial mode , Every item Read the next one after reading item. When using parallel traversal , The data will be divided into segments , Each segment is processed in a different thread , Then output the results together .

Stream The parallel operation of depends on Java7 Introduced in Fork/Join frame (JSR166y) To split tasks and speed up processing .Java parallel API The evolution process of is basically as follows :

1.0-1.4 Medium java.lang.Thread

5.0 Medium java.util.concurrent

6.0 Medium Phasers etc.

7.0 Medium Fork/Join frame

8.0 Medium Lambda

Stream Parallel processing capability , The process will be divided into several small tasks , This means that each task is an operation :

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream()       .forEach(out::println);

You can see a simple line of code to help us realize the function of parallel output set elements , But because the order of parallel execution is uncontrollable , The result of each execution may be different .

If it must be the same , have access to forEachOrdered Perform the terminate operation :

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream()       .forEachOrdered(out::println);

Here's a question , If the results need to be ordered , Does it violate our original intention of parallel execution ? Yes , under these circumstances , Obviously, there is no need to use parallel streams , You can use serial streams directly , Otherwise, the performance may be worse , Because all parallel results are eventually forced to sort .

well , Let's introduce Stream Interface knowledge .

2.BaseStream Interface .

Stream The parent interface of is BaseStream, The latter is the top-level interface for all flow implementations , The definition is as follows :

public interface BaseStream<T, S extends BaseStream<T, S>>        extends AutoCloseable {    Iterator<T> iterator();    Spliterator<T> spliterator();    boolean isParallel();    S sequential();    S parallel();    S unordered();    S onClose(Runnable closeHandler);    void close();}

among ,T Is the type of element in the stream ,S yes BaseStream Implementation class of , The elements are also T,S It's my own :

S extends BaseStream<T, S>

Isn't it a little bit muddled ?

in fact , It's easy to understand . Let's look at the interface S Use : for example ,sequential() and parallel() Both methods return s example , That is, it supports the serial or parallel operation of the current stream , And return the changed stream object .

If it's parallel , Must involve the splitting of the current stream , That is, a stream is divided into multiple sub streams , The child stream must be of the same type as the parent stream . Subflow can continue to split subflow and continue to split

in other words , there S yes BaseStream Implementation class of , It is also a stream , Such as Stream、IntStream、LongStream etc. .

3.Stream Interface .

Take a look Stream Interface declaration of :

public interface Stream<T> extends BaseStream<T, Stream<T>>

What is not difficult to understand here is ,Stream Can continue to be divided into Stream, We can prove it by some of its methods :

Stream<T> filter(Predicate<? super T> predicate);<R> Stream<R> map(Function<? super T, ? extends R> mapper);<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);Stream<T> sorted();Stream<T> peek(Consumer<? super T> action);Stream<T> limit(long maxSize);Stream<T> skip(long n);...

These are the intermediate operations of the operation flow , The returned result must be the stream object itself .

4. Turn off flow operation .

Basestream Realized Autocloseable Interface , namely close() Method is called when the stream is closed . meanwhile ,Basestream It also provides us with Onclose() Method :

S onClose(Runnable closeHandler);

When calling AutoCloseable Of close() Interface , Will trigger the call flow object Onclose() Method , But there are a few things to note :

  • onclose() Method will return the stream object itself , You can call the object multiple times .

  • If you call multiple onClose() Method , Is triggered in the calling order , But if something goes wrong , Only the first exception will be thrown upward .

  • The former one onClose() Method throws an exception , It doesn't affect the follow-up onclose() Use of methods .

  • If multiple onClose() Method throws an exception , Show only the first exception stack , Other exceptions are compressed , Show only part of the information .

5. Parallel streams and serial streams .

Basestream The interface provides two methods: parallel flow and serial flow . These two methods can be called multiple times or mixed , But in the end, the return result of the last method call can only prevail .

Reference resources parallel() Method description :

Returns an equivalent stream that is parallel. May return

itself, either because the stream was already parallel, or because

the underlying stream state was modified to be parallel.

therefore , The same method does not produce new streams , Instead, directly reuse the current stream object .

In the following example, with the last call parallel() Subject to , The end result is parallel computing sum:

stream.parallel()   .filter(...)   .sequential()   .map(...)   .parallel()   .sum();

ForkJoin The frame is JDK7 A new feature of . And ThreadPoolexecutor equally , It also achieved Executor and ExecutorService Interface . It uses an infinite queue to hold the tasks to be executed , The number of threads is transmitted through the constructor . If the required number of threads is not transferred to the constructor , Currently available on your computer CPU The number will be set to the number of threads as the default .

ForkJoinPool Mainly used for divide and conquer (Divide-and-conqueralgorithm) solve the problem , Typical applications are _ Fast sorting algorithm _. The point here is ForkJoinPool Need to use relatively few threads to handle a large number of tasks .

for example , If you want to 1000 Ten thousand data are sorted , Divide the task into two 500 Ten thousand sorting tasks and one 500 The task of merging 10000 data .

And so on ,500 Million data will also be segmented , At most, a threshold will be set , To specify when the data size reaches , Such segmentation will be stopped . for example , When the number of elements is less than 10 when , They will stop dividing , And use insert sort to sort it . Last , All the tasks add up to about 2 ten thousand +.

The key to the problem is , For a task , Only when all its subtasks have been completed , It can only be carried out , Imagine the process of merging and sorting .

therefore , Use ThreadPolexecutor when , Using divide and conquer can cause problems , because ThreadPolexecutor The thread in cannot add a task to the task queue , And continue to execute after waiting for the task to be completed . Use ForkJoinPool when , You can create a new task and suspend the current task . here , Threads can select subtasks to execute from the queue .

So use ThreadPolexecutor or ForkJoinPool What are the performance differences ?

First , Use ForkJoinPool A limited number of threads can be used to complete many tasks with parent-child relationships , For example, using 4 Threads to complete more than 200 Ten thousand missions . Use ThreadPolexecutor It's impossible , because ThreadPolexecutor Medium Thread You cannot choose to prioritize subtasks . When you need to finish 200 Ten thousand tasks with father son relationship , Also needed 200 000 threads , This is obviously not feasible .

6.Work Stealing principle :

  • Each worker thread has its own work queue WorkQueue;

  • This is a two terminal queue dequeue, Is a private thread ;

  • ForkJoinTask in fork The subtasks of will be placed in the queue head of the worker thread running the task , The worker thread will follow LIFO Process the tasks in the work queue in the order of , The stack ;

  • In order to make the most of CPU, Idle threads will steal tasks from other threads' queues .

  • But the tail of the work queue steals the task , To reduce the competition with the thread to which the queue belongs ;

  • Two terminal queue operation :push()/pop() Invoked only in the worker thread of its owner ,poll() It is called when another thread steals the task ;

  • When there is only one task left , There will still be passing CAS Realize the competition ;

7. from ForkJoinPool From the perspective of ParallelStream.

Java8 by ForkJoinPool Added a common thread pool , Used to handle tasks that are not explicitly submitted to any thread pool . It is ForkJoinPool Static element of type , The default number of threads is equal to the number of threads running on the computer CPU Count .

call Arrays When adding a new method to a class , Automatic parallelization will occur .

for example , Parallel quick sort for sorting arrays is used to traverse the elements in the array in parallel . Automatic parallelization is also used for Java8 Newly added StreamAPI.

for example , The following code is used to traverse the elements in the list and perform the required actions :

List<UserInfo> userInfoList =        DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);

The operations of the elements in the list will be performed in parallel .foreach Method will create a task for the calculation operation of each element , The task will be carried out by the above forkJoinPool Medium comonPool Handle .

Of course ,ThreadPolexecutor You can also complete the above parallel computing logic , But in terms of readability and quantity of code ,ForkJoinPool Obviously better .

about ForkJoinPool The number of threads in the common thread pool , Default values are usually used , That is, the number of computer processors at run time . You can also set system properties :-Djava.util.concurent.ForkJoinPool.common.parallelism=N(N Number of threads ) To adjust ForkJoinPool Number of threads .

It is worth noting that , The thread currently executing will also be used to execute the task , So the final number of threads is N+1,1 Is the current main thread

Here's a question , If you use blocking operations in the execution of parallel streams , such as I/O, It is likely to cause some problems :

public static String query(String question) {  List<String> engines = new ArrayList<String>();  engines.add("");  engines.add("");  engines.add("");  // get element as soon as it is available  Optional<String> result = - {    String url = base + question;    // open connection and fetch the result    return WS.url(url).get();  }).findAny();  return result.get();}

This is a typical example . Let's analyze :

  • This parallel stream computing operation will be performed by the main thread and JVM default ForkJoinPool.commonPool() Joint implementation .

  • map It's a blocking method , Need to access HTTP Interface and get its response, So anything worker Until this point, the thread will be blocked and wait for the result .

  • therefore , When the calculation method is called in parallel elsewhere , Will be affected by the blocking wait method .

  • at present ,ForkJoinPool The implementation of does not consider compensating for waiting blocking and waiting for the work of the newly generated thread worker Threads , therefore ForkJoinPool.comonPool() Threads in will stand by and block waiting .

As we know from the analysis of Liezi above ,lambda The execution of is not done in an instant , All use parallel streams Can be the source of blocking programs , And during execution, other parts of the program will not be able to access these workers, It means any dependence parallel streams What else is occupying your program common ForkJoinPool It's going to be unpredictable and potentially dangerous .

Summary :

  • When you need to deal with the recursive divide and conquer algorithm , Consider using ForkJoinPool.

  • Carefully set the threshold for no longer dividing tasks , This has an impact on performance .

  • ForkJoinPool The common thread pool in will be used for Java8 Some of the features in . In some cases , You need to adjust the default number of threads in the thread pool .

  • lambda Side effects should be avoided as much as possible , That is, avoid heap based state and any IO The mutation of .

  • lambda Should not interfere with each other , in other words , Avoid modifying data sources ( Because this may lead to thread safety )

  • Avoid states that may change during the flow operation life cycle .

8. Performance of parallel streams .

The performance of the parallel flow framework is affected by the following factors :

  • data size : It also takes up , The processing time of each pipeline is long enough , Parallelism makes sense ;

  • Source data structure : Each pipeline operation is based on the initial data source , It's usually a collection , The segmentation of different collection data sources will have a certain consumption ;

  • Packing : Processing basic types is faster than packing types ;

  • Check the number : By default , The more cores , Bottom fork/join The more threads the thread pool starts ;

  • Unit processing overhead : The longer each element spends in the stream , The more obvious the performance improvement brought by parallel operation ;

The source data structure is divided into the following three groups :

  • Good performance :ArrayList, An array or Intstream.range( Data support random reading , Easily split )

  • Average performance :HashSet,TreeSet( Data is not easy to decompose fairly , Most can also )

  • Poor performance :LinkedList( You need to traverse the list , Semi decomposition is difficult ),Stream.itrate and Buferedreader.Lines( Length unknown , Difficult to decompose )

9.NQ Model .

To determine whether parallelism will bring acceleration , The last two factors need to be considered : The amount of data available and the amount of computation per data element .

In our initial parallel decomposition description , The concept we adopt is to split the source , Until the segment is small enough , In order to solve the problems in the segmentation more effectively . The size of the segment must depend on the problem solved and on the workload of each element .

for example , Calculating the length of a string involves much less work than calculating the length of a string SHA-1 Hash value . The more work each element does , The lower the threshold enough to use parallelism . Similarly , More data , The more you divide , Will not conflict with too small a threshold .

A simple but useful parallel performance model is NQ Model , among N Is the number of data elements ,Q Is the workload of each element . The product of N*Q The bigger it is , The more likely it is to achieve parallel acceleration . about Q A small problem , For example, sum of numbers , You usually want to see N>10000 To get acceleration ; With Q An increase in , The data size required to obtain acceleration will be reduced .

Many obstacles to parallelization ( Such as split cost 、 Combination cost or order sensitivity ) Can pass Q Higher operation to alleviate . Although split one LinkedList The result of the feature may be very poor , But as long as it's big enough Q, You can still get parallel acceleration .

10. Order encountered .

Encounter order refers to whether the order in which the source distributes elements is critical to the calculation . Some sources ( Such as hash based collections and mappings ) There is no meaningful encounter order . Flow flag ORDERED Describes whether the flow makes sense .

JDK A collection of spliterator This flag will be set according to the set specification ;

Some intermediate operations may inject ORDERED (sorted()) Or clear it (unordered()).

If the stream does not encounter a sequence , Most stream operations must be sequential . For sequential execution , Will automatically preserve the order encountered , Because elements are handled naturally in the order they are encountered .

Even in parallel execution , Many operations ( Stateless intermediate operations and some termination operations ( Such as reduce()) Nor will there be any actual costs in sequence .

But for other operations ( Stateful intermediate operation , Its semantics is related to the sequence of termination operations encountered , Such as findfirst() or foreachordered()), In parallel execution, the responsibility to follow the order encountered can be very important .

If the flow has a defined encounter order , But this order makes no sense to the result , By using unordered() Operation delete ORDERED sign , Accelerate the sequencing of pipelines that contain sequence sensitive operations .

As an example of sequence sensitive operations , You can consider limit(), It cuts off a stream at the specified size . Implement... In sequential execution limit() It's simple : Keep a counter for how many elements you see , Then discard any elements .

But in parallel execution , Realization limit() It's a lot more complicated ; Before you need to keep N Elements . This requirement greatly limits the ability to use parallelism ; If the input is divided into multiple parts , Only when all the parts before a certain part are completed , Only then can you know whether the result of this part will be included in the final result .

therefore , Before you reach your target length , You usually choose not to use all available cores by mistake , Or cache the whole test results .

If the stream does not encounter a sequence ,limit() You can choose any operation freely n Elements , This makes execution much more efficient . After knowing the elements , Send immediately downstream without any cache , The only coordination that needs to be performed between threads is to send signals , To ensure that the target stream length is not exceeded .

Another unusual example of encountering sequential costs is sorting . If you encounter a meaningful order ,sorted() The operation will achieve a stable sort ( The same elements appear in the output in the same order when entering ), Without disorder , stability ( There is a cost ) It's not necessary .

distinct() There is a similar situation : If a stream has an encounter order ,distinct() The first of multiple identical input elements must be sent , And for disordered flow , It can send any element —— It can also achieve more efficient parallel implementation .

Use colect() Similar situations will be encountered during aggregation . If colect(groupingBy() Execute on an unordered stream , The elements corresponding to any key must be provided to the downstream collector according to the order in which they appear in the input .

This order usually makes no sense to the application , No order makes sense . In these cases , It's best to choose a concurrent collector ( Such as groupingByconcurent(), You can ignore the order encountered , Let all threads collect the shared concurrent data structure directly ( Such as concurenthashMap), Instead of letting each thread collect its own intermediate mapping , Then merge the intermediate mappings ( This can be costly ).

Thank you for your , Keep reading it , Since I chose this road , Then come on , Learning together ! If a friend who has finished learning , You need information to help you find a job , Focus on :IT Maple fighter , Receive the corresponding material benefits !

reply :java Full set of learning resources

reply : Interview information

reply : Feng GE's resume

reply : The programmer confesses the artifact                 ( Say goodbye to the programmer's single dog !)

reply : Part time programmer website

reply : Maple brother 666                             ( obtain 66 A set of actual combat data of the project , Interview video of large factory )

copyright notice
author[It Maple fighter],Please bring the original link to reprint, thank you.

Random recommended