Java IO Tutorial — Java Pipe
A pipe connects an input stream and an output stream.
A piped I/O is based on the producer-consumer pattern, where the producer produces data and the consumer consumes the data.
In a piped I/O, we create two streams representing two ends of the pipe. A PipedOutputStream object represents one end and a PipedInputStream object represents the other end. We connect the two ends using the connect() method on the either object.
We can also connect them by passing one object to the constructor when we create another object.
The following code shows two ways of creating and connecting the two ends of a pipe:
The first method creates a piped input and output streams and connect them. It connects the two streams using connect method.
PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(); pis.connect(pos); /* Connect the two ends */
The second method creates piped input and output streams and connect them. It connects the two streams by passing the input piped stream to the output stream constructor.
PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(pis);
We can produce and consume data after we connect the two ends of the pipe.
We produce data by using one of the write() methods of the PipedOutputStream object. Whatever we write to the piped output stream automatically becomes available to the piped input stream object for reading.
We use the read() method of PipedInputStream to read data from the pipe. The piped input stream is blocked if data is not available when it attempts to read from the pipe.
A piped stream has a buffer with a fixed capacity to store data between the time it is written to and read from the pipe.
We can set the pipe capacity when we create it. If a pipe’s buffer is full, an attempt to write on the pipe will block.
The following code creates piped input and output streams with the buffer capacity of 2048 bytes.
PipedOutputStream pos = new PipedOutputStream(); PipedInputStream pis = new PipedInputStream(pos, 2048);
A pipe is used to transfer data from one thread to another. The synchronization between two threads is taken care of by the blocking read and write.
Example
The following code demonstrates how to use a piped I/O.
import java.io.PipedInputStream; import java.io.PipedOutputStream; //w w w. j av a 2 s.c o m public class Main < public static void main(String[] args) throws Exception < PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(); pos.connect(pis); Runnable producer = () -> produceData(pos); Runnable consumer = () -> consumeData(pis); new Thread(producer).start(); new Thread(consumer).start(); > public static void produceData(PipedOutputStream pos) < try < for (int i = 1; i byte) i); pos.flush(); System.out.println("Writing: " + i); Thread.sleep(500); > pos.close(); > catch (Exception e) < e.printStackTrace(); >> public static void consumeData(PipedInputStream pis) < try < int num = -1; while ((num = pis.read()) != -1) < System.out.println("Reading: " + num); > pis.close(); > catch (Exception e) < e.printStackTrace(); >> >
The code above generates the following result.
java2s.com | © Demo Source and Support. All rights reserved.
Официальный блог lex’a
«пока не начнешь разбираться с решением поставленной задачи ты не понимаешь её реальной сложности. » © lex.
Страницы
вторник, 7 февраля 2012 г.
Java. Pipe. Реализации и сравнение.
Ну вот уж наконец добрался до написания статей. Каникулы прошли, но на них писать было абсолютно лень и вот руки по-тихоньку дошли.:)
Первое с чего решил я начать — Пайпы (Pipe). Pipe является разновидностью «трубок», а конкретно неименнованной однонаправленной «трубкой», т.е. каналом, данные по которому идут в одном направлении, в отличие от сокета, трубки с дву-направленным каналом, данные в котором могут идти в обоих направлениях. Неименованной же трубкой Pipe называется из-за того, что указатель(дескриптор) на уже созданный Pipe можно получить лишь передав его как переменную внутри программы. Именованные трубки, такие как сокет и линуксовый mkfifo могут быть доступны вне создавшей их программы и даже из сети(сокет).
Все выше перечисленные разновидности «трубок» являются незаменимыми средствами межпроцессного взаимодействия (Inter Process Communications — IPC) и реализованы в любой ОС. И соответственно реализованы и в Java (аж 3 способами), т.к. в виртуальной машине Java тоже реализованы свои потоки.
Пусть программа состоит из 2 потоков, которые должны взаимодействовать. Для удобства взаимодействия сделаем удобную обертку объединяющую 2 пайпа и делающую что то на подобие сокета. Т.е. каждому из 2 потоков будет передаваться объект с методами записи и чтения, каждый из которых пишет/читает в/из одного из 2 объединенных пайпов. Для начала ограничимся передачей текстовых сообщений.
Способ первый. java.nio.Pipe
На мой взгляд самый предпочтительный вариант, ибо следующие 2 дают задержку при передаче примерно на 1 секунду. Если честно не понял в чем проблема, буду благодарен за подсказку:).
Для начала приведем полный листинг полученного класса и разберем его:
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.Pipe;
- import java.nio.channels.Pipe.SinkChannel;
- import java.nio.channels.Pipe.SourceChannel;
- /**
- *
- * @author Александр Емельянов (mr.lex91@gmail.com)
- */
- public class PipeConnectorBuffered implements IPipe
- private SinkChannel writer;
- private SourceChannel reader;
- private String part= «» ;
- public static IPipe[] createPipe() throws IOException
- PipeConnectorBuffered[] pipeConnector= new PipeConnectorBuffered[ 2 ];
- Pipe p1=Pipe.open();
- Pipe p2=Pipe.open();
- pipeConnector[ 0 ]= new PipeConnectorBuffered(p1.source(), p2.sink());
- pipeConnector[ 1 ]= new PipeConnectorBuffered(p2.source(), p1.sink());
- return pipeConnector;
- >
- PipeConnectorBuffered(SourceChannel reader, SinkChannel writer)
- this .reader=reader;
- this .writer=writer;
- >
- @Override
- public String Read()
- int n= 1024 ;
- int i=n;
- ByteBuffer buf1 = ByteBuffer.allocate(n);
- String str=part;
- try
- while ( true )
- if (str.contains( «» ))
- if (str.indexOf( «» )+ 6
- else part= «» ;
- str=str.substring( 0 , str.indexOf( «» ));
- return str;
- >
- buf1.clear();
- if (i
- i = reader.read(buf1);
- buf1.flip();
- str+= new String(buf1.array(), 0 ,i);
- >
- > catch (IOException ex) <>
- return str;
- >
- @Override
- public void Write(String text)
- ByteBuffer buf = ByteBuffer.allocate( 2 *text.length()+ 5 );
- buf.clear();
- buf.put((text+ «» ).getBytes());
- buf.flip();
- try
- writer.write(buf);
- > catch (IOException ex) <>
- >
- >
Пайп создается одним простым вызовом Pipe.open() и еще 2 вызовами получаются дескрипторы 2 концов пайпа: SourceChannel(конец пайпа, из которого чиают) и SinkChannel(конец пайпа, в который пишут). Собственно с этими 2 объектами нам и надо разобраться. Что бы создать согласно нашей задаче 2 взаимосвязных объекта необходимо в каждый из них передать пишущий конец одного пайпа и читающий другого. Это и делает статичная функция createPipe(), она возвращает 2 объекта PipeConnectorBuffered, конструктор которого приватен, т.к. напрямую вызывать его вне класса необходимости нет.
SourceChannel принимает на как параметр метода read() и SinkChannel в метод write() переменную типа ByteBuffer. Метод read() возвращает количество прочитанных байт.
Думаю алгоритм записи и чтения сообщения вам понятен (вам понятно для чего нужны метка и переменная part?:)). Если честно, то в остальных 2 реализациях PipeConnector функции read() и write() почти не отличаются (только типом оперируемых данных).
Большего интереса представляют собственно эти самые отличия.
2-й и 3-й способы
В отличие от java.nio.Pipe в этих случаях пайп создается по другому. Сначала создается пишущий конец (PipedOutputStream и PipedWriter), а затем читающий (PipedInputStream и PipedReader), в конструктор которого передается пишущий конец.
createPipe() для PipeConnectorStreamed:
Теперь вы можете оценить во истину удобство введения интерфейса IPiped :). Вне зависимости от выбора одного из 3 наших классов при тестировании (а переключаться приходится часто;)) использование в дальнейшем коде будет неизменным, поэтому меняется только 1 строчка на соответствующий .createPipe().
Метод read() будет отличаться типом буфера для считывания. Для PipeConnectorRW ByteBuffer просто заменяется на CharBuffer, а для PipeConnectorStreamed на просто byte[] и соответственно уходят строки buf1.clear() и buf1.flip().
Метод write() для PipeConnectorRW сократиться и станет минимальным с одним вызовом принимающего в качестве параметра строку метода write() PipedWriter’a. А для PipeConnectorStreamed в метод write() необходимо передать именно массив байт, его получим из метода array() нашего ByteBuffer’a, а так же индексы из этого массива, означающие диапазон записываемых в пайп байт. Т.к. нам нужно записать все переданные байты, то в качестве индексов указываем 0 и длину передаваемого текста (text.length()+5).
Полный листинг классов PipeConnectorRW и PipeConnectorStreamed приведен ниже под катом.
- import java.io.IOException;
- import java.io.PipedReader;
- import java.io.PipedWriter;
- import java.nio.ByteBuffer;
- import java.nio.CharBuffer;
- /**
- *
- * @author Александр Емельянов (mr.lex91@gmail.com)
- */
- public class PipeConnectorRW implements IPipe
- private PipedWriter writer;
- private PipedReader reader;
- private String part= «» ;
- public static IPipe[] createPipe() throws IOException
- IPipe[] pc= new PipeConnectorRW[ 2 ];
- PipedWriter pw1= new PipedWriter();
- PipedReader pr1= new PipedReader(pw1);
- PipedWriter pw2= new PipedWriter();
- PipedReader pr2= new PipedReader(pw2);
- pc[ 0 ]= new PipeConnectorRW(pr1,pw2);
- pc[ 1 ]= new PipeConnectorRW(pr2,pw1);
- return pc;
- >
- PipeConnectorRW(PipedReader reader, PipedWriter writer)
- this .reader=reader;
- this .writer=writer;
- >
- public String Read()
- int n= 1024 ;
- int i=n;
- CharBuffer buf1 = CharBuffer.allocate(n);
- String str=part;
- try
- while ( true )
- if (str.contains( «» ))
- if (str.indexOf( «» )+ 6
- else part= «» ;
- str=str.substring( 0 , str.indexOf( «» ));
- return str;
- >
- buf1.clear();
- if (i
- i = reader.read(buf1);
- buf1.flip();
- str+= new String(buf1.array(), 0 ,i);
- >
- > catch (IOException ex) <>
- return str;
- >
- public void Write(String text)
- try
- writer.write(text+ «» );
- > catch (IOException ex) <>
- >
- >
- import java.io.*;
- import java.nio.ByteBuffer;
- import java.nio.CharBuffer;
- /**
- *
- * @author Александр Емельянов (mr.lex91@gmail.com)
- */
- public class PipeConnectorStreamed implements IPipe
- private PipedOutputStream writer;
- private PipedInputStream reader;
- private String part= «» ;
- public static IPipe[] createPipe() throws IOException
- IPipe[] pc= new PipeConnectorStreamed[ 2 ];
- PipedOutputStream pw1= new PipedOutputStream();
- PipedInputStream pr1= new PipedInputStream(pw1);
- PipedOutputStream pw2= new PipedOutputStream();
- PipedInputStream pr2= new PipedInputStream(pw2);
- pc[ 0 ]= new PipeConnectorStreamed(pr1,pw2);
- pc[ 1 ]= new PipeConnectorStreamed(pr2,pw1);
- return pc;
- >
- PipeConnectorStreamed(PipedInputStream reader, PipedOutputStream writer)
- this .reader=reader;
- this .writer=writer;
- >
- public String Read()
- int n= 1024 ;
- int i=n;
- byte [] buf1 = new byte [n];
- String str=part;
- try
- while ( true )
- if (str.contains( «» ))
- if (str.indexOf( «» )+ 6
- else part= «» ;
- str=str.substring( 0 , str.indexOf( «» ));
- return str;
- >
- if (i
- i = reader.read(buf1);
- str+= new String(buf1, 0 ,i);
- >
- > catch (IOException ex) <>
- return str;
- >
- public void Write(String text)
- ByteBuffer buf = ByteBuffer.allocate( 2 *text.length()+ 5 );
- buf.clear();
- buf.put((text+ «» ).getBytes());
- buf.flip();
- try
- writer.write(buf.array(), 0 ,text.length()+ 5 );
- > catch (IOException ex) <>
- >
- >
В данной статье представлена только пересылка текста. Пересылка файла либо других бинарных данных не вызовет больших затруднений, ибо передача все равно идет байтовая (кроме PipeConnectorRW).