Hi All,
This is something to test on all platforms before push. I've used it on 64-bit linux without problems so far.
Levente
On Mon, 25 Jul 2016, commits@source.squeak.org wrote:
Levente Uzonyi uploaded a new version of Network to project The Inbox: http://source.squeak.org/inbox/Network-ul.180.mcz
==================== Summary ====================
Name: Network-ul.180 Author: ul Time: 25 July 2016, 8:40:01.001452 pm UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95 Ancestors: Network-nice.179
Socket changes:
- fixed the comment of #isOtherEndConnected and #isThisEndConnected
- do not slice the data (TCP) in the image in #sendData:. Let the VM, the kernel, the hardware deal with that.
- use #isOtherEndConnected when receiving data, and #isThisEndConnected when sending data instead of #isConnected
- move away from #milliseconds:since:, since we have a clock that won't roll over
=============== Diff against Network-nice.179 ===============
Item was changed: ----- Method: Socket>>closeAndDestroy: (in category 'connection open/close') ----- closeAndDestroy: timeoutSeconds "First, try to close this connection gracefully. If the close attempt fails or times out, abort the connection. In either case, destroy the socket. Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."
- socketHandle ifNil: [ ^self ].
- self isThisEndConnected ifTrue: [
self close. "Close this end." ].
- (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
"The other end has not closed the connect yet, so we will just abort it."
self primSocketAbortConnection: socketHandle ].
- self destroy!
- socketHandle ifNotNil: [
self isConnected ifTrue: [
self close. "close this end"
(self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
"The other end didn't close so we just abort the connection"
self primSocketAbortConnection: socketHandle]].
self destroy].
- !
Item was changed: ----- Method: Socket>>discardReceivedData (in category 'receiving') ----- discardReceivedData "Discard any data received up until now, and return the number of bytes discarded."
| buf totalBytesDiscarded | buf := String new: 10000. totalBytesDiscarded := 0.
- [self isOtherEndConnected and: [self dataAvailable]] whileTrue: [
- [self isConnected and: [self dataAvailable]] whileTrue: [ totalBytesDiscarded := totalBytesDiscarded + (self receiveDataInto: buf)]. ^ totalBytesDiscarded
!
Item was changed: ----- Method: Socket>>isOtherEndConnected (in category 'queries') ----- isOtherEndConnected
- "Return true if this socket is connected, or this end has closed the connection but not the other end, so we can still receive data."
"Return true if this socket is connected, or this end has closed the connection but not the other end, so we can still send data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ]. ^state == ThisEndClosed
!
Item was changed: ----- Method: Socket>>isThisEndConnected (in category 'queries') ----- isThisEndConnected
- "Return true if this socket is connected, other the other end has closed the connection but not this end, so we can still send data."
"Return true if this socket is connected, other the other end has closed the connection but not this end, so we can still receive data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ]. ^state == OtherEndClosed
!
Item was changed: ----- Method: Socket>>sendData: (in category 'sending') ----- sendData: aStringOrByteArray "Send all of the data in the given array, even if it requires multiple calls to send it all. Return the number of bytes sent."
"An experimental version use on slow lines: Longer timeout and smaller writes to try to avoid spurious timeouts."
| bytesSent bytesToSend count | bytesToSend := aStringOrByteArray size. bytesSent := 0. [bytesSent < bytesToSend] whileTrue: [ (self waitForSendDoneFor: 60) ifFalse: [ConnectionTimedOut signal: 'send data timeout; data not sent']. count := self primSocket: socketHandle sendData: aStringOrByteArray startIndex: bytesSent + 1
count: bytesToSend - bytesSent.
count: (bytesToSend - bytesSent min: DefaultSendBufferSize).
bytesSent := bytesSent + count].
^ bytesSent
!
Item was changed: ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in category 'waiting') ----- waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused: refusedBlock "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
- | deadline timeLeft status |
- deadline := Time millisecondClockValue + (timeout * 1000) truncated.
- (status := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [^true].
- [ (status == WaitingForConnection) and: [ (timeLeft := deadline - Time millisecondClockValue) > 0 ] ]
- | startTime msecsDelta msecsEllapsed status |
- startTime := Time millisecondClockValue.
- msecsDelta := (timeout * 1000) truncated.
- status := self primSocketConnectionStatus: socketHandle.
- status = Connected ifTrue: [^true].
- [(status = WaitingForConnection) and: [(msecsEllapsed := Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
semaphore waitTimeoutMSecs: timeLeft.
status := self primSocketConnectionStatus: socketHandle ].
- status == Connected ifTrue: [ ^true ].
- status == WaitingForConnection
ifTrue: [ timeoutBlock value ]
ifFalse: [ refusedBlock value ].
- ^false!
semaphore waitTimeoutMSecs: msecsDelta - msecsEllapsed.
status := self primSocketConnectionStatus: socketHandle].
- status = Connected
ifFalse: [
status = WaitingForConnection
ifTrue: [timeoutBlock value]
ifFalse: [refusedBlock value].
^false].
- ^ true!
Item was changed: ----- Method: Socket>>waitForConnectionUntil: (in category 'waiting') ----- waitForConnectionUntil: deadline "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
- | status timeLeft |
- | status waitTime | [ (status := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ]. status == WaitingForConnection ifFalse: [ ^false ].
(timeLeft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^false ].
semaphore waitTimeoutMSecs: timeLeft ] repeat!
(waitTime := deadline - Time millisecondClockValue) > 0 ifFalse: [ ^false ].
semaphore waitTimeoutMSecs: waitTime ] repeat!
Item was changed: ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category 'waiting') ----- waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock "Wait for the given nr of seconds for data to arrive."
- | deadline timeLeft |
- | startTime msecsDelta | socketHandle ifNil: [ ^closedBlock value ].
- deadline := Time millisecondClockValue + (timeout * 1000) truncated.
- startTime := Time millisecondClockValue.
- msecsDelta := (timeout * 1000) truncated. [ (self primSocketReceiveDataAvailable: socketHandle) ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value ].
(timeLeft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^timedOutBlock value ].
self isConnected ifFalse: [ ^closedBlock value ].
"Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed." readSemaphore waitTimeoutMSecs:(Time millisecondsSince: startTime) < msecsDelta ifFalse: [ ^timedOutBlock value ].
(timeLeft min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
(msecsDelta - (Time millisecondsSince: startTime) min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
Item was changed: ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting') ----- waitForDataIfClosed: closedBlock "Wait indefinitely for data to arrive. This method will block until data is available or the socket is closed."
socketHandle ifNil: [ ^closedBlock value ]. [ (self primSocketReceiveDataAvailable: socketHandle) ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value ].
"ul 8/13/2014 21:16 Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Replace the ""waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout"" part with ""wait"" when the bug is fixed." readSemaphore waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout ] repeat!self isConnected ifFalse: [ ^closedBlock value ].
Item was changed: ----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting') ----- waitForDisconnectionFor: timeout "Wait for the given nr of seconds for the connection to be broken. Return true if it is broken by the deadline, false if not. The client should know the connection is really going to be closed (e.g., because he has called 'close' to send a close request to the other end) before calling this method."
- | deadline |
- deadline := Time millisecondClockValue + (timeout * 1000) truncated.
- [ self isOtherEndConnected and: [ deadline - Time millisecondClockValue > 0 ] ]
whileTrue: [
self discardReceivedData.
"Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(deadline - Time millisecondClockValue min: self class maximumReadSemaphoreWaitTimeout) ].
- ^self isOtherEndConnected!
- | startTime msecsDelta status |
- startTime := Time millisecondClockValue.
- msecsDelta := (timeout * 1000) truncated.
- status := self primSocketConnectionStatus: socketHandle.
- [((status == Connected) or: [(status == ThisEndClosed)]) and:
[(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
self discardReceivedData.
"Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(msecsDelta - (Time millisecondsSince: startTime) min: self class maximumReadSemaphoreWaitTimeout).
status := self primSocketConnectionStatus: socketHandle].
- ^ status ~= Connected!
Item was changed: ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') ----- waitForSendDoneFor: timeout "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
- | deadline timeleft |
- deadline := Time millisecondClockValue + (timeout * 1000) truncated.
- | startTime msecsDelta msecsEllapsed |
- startTime := Time millisecondClockValue.
- msecsDelta := (timeout * 1000) truncated. [ (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
self isThisEndConnected ifFalse: [ ^false ].
(timeleft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^false ].
writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
self isConnected ifFalse: [ ^false ].
(msecsEllapsed := Time millisecondsSince: startTime) < msecsDelta ifFalse: [ ^false ].
writeSemaphore waitTimeoutMSecs: msecsDelta - msecsEllapsed ] repeat!
Hi Levente, Magma's test cases can't seem to get to the end with this.. I haven't investigated yet..
On Mon, Jul 25, 2016 at 2:40 PM, Levente Uzonyi leves@caesar.elte.hu wrote:
Hi All,
This is something to test on all platforms before push. I've used it on 64-bit linux without problems so far.
Levente
On Mon, 25 Jul 2016, commits@source.squeak.org wrote:
Levente Uzonyi uploaded a new version of Network to project The Inbox: http://source.squeak.org/inbox/Network-ul.180.mcz
==================== Summary ====================
Name: Network-ul.180 Author: ul Time: 25 July 2016, 8:40:01.001452 pm UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95 Ancestors: Network-nice.179
Socket changes:
- fixed the comment of #isOtherEndConnected and #isThisEndConnected
- do not slice the data (TCP) in the image in #sendData:. Let the VM, the
kernel, the hardware deal with that.
- use #isOtherEndConnected when receiving data, and #isThisEndConnected
when sending data instead of #isConnected
- move away from #milliseconds:since:, since we have a clock that won't
roll over
=============== Diff against Network-nice.179 ===============
Item was changed: ----- Method: Socket>>closeAndDestroy: (in category 'connection open/close') ----- closeAndDestroy: timeoutSeconds "First, try to close this connection gracefully. If the close attempt fails or times out, abort the connection. In either case, destroy the socket. Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."
socketHandle ifNil: [ ^self ].
self isThisEndConnected ifTrue: [
self close. "Close this end." ].
(self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
"The other end has not closed the connect yet, so we will
just abort it."
self primSocketAbortConnection: socketHandle ].
self destroy!
socketHandle ifNotNil: [
self isConnected ifTrue: [
self close. "close this end"
(self waitForDisconnectionFor:
timeoutSeconds) ifFalse: [
"The other end didn't
close so we just abort the connection"
self
primSocketAbortConnection: socketHandle]].
self destroy].
- !
Item was changed: ----- Method: Socket>>discardReceivedData (in category 'receiving') ----- discardReceivedData "Discard any data received up until now, and return the number of bytes discarded."
| buf totalBytesDiscarded | buf := String new: 10000. totalBytesDiscarded := 0.
[self isOtherEndConnected and: [self dataAvailable]] whileTrue: [
[self isConnected and: [self dataAvailable]] whileTrue: [ totalBytesDiscarded := totalBytesDiscarded + (self receiveDataInto:
buf)]. ^ totalBytesDiscarded !
Item was changed: ----- Method: Socket>>isOtherEndConnected (in category 'queries') ----- isOtherEndConnected
"Return true if this socket is connected, or this end has closed
the connection but not the other end, so we can still receive data."
"Return true if this socket is connected, or this end has closed
the connection but not the other end, so we can still send data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [ ^true ]. ^state == ThisEndClosed !
Item was changed: ----- Method: Socket>>isThisEndConnected (in category 'queries') ----- isThisEndConnected
"Return true if this socket is connected, other the other end has
closed the connection but not this end, so we can still send data."
"Return true if this socket is connected, other the other end has
closed the connection but not this end, so we can still receive data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [ ^true ]. ^state == OtherEndClosed !
Item was changed: ----- Method: Socket>>sendData: (in category 'sending') ----- sendData: aStringOrByteArray "Send all of the data in the given array, even if it requires multiple calls to send it all. Return the number of bytes sent."
"An experimental version use on slow lines: Longer timeout and
smaller writes to try to avoid spurious timeouts."
| bytesSent bytesToSend count | bytesToSend := aStringOrByteArray size. bytesSent := 0. [bytesSent < bytesToSend] whileTrue: [ (self waitForSendDoneFor: 60) ifFalse: [ConnectionTimedOut signal: 'send data
timeout; data not sent']. count := self primSocket: socketHandle sendData: aStringOrByteArray startIndex: bytesSent + 1
count: bytesToSend - bytesSent.
count: (bytesToSend - bytesSent min:
DefaultSendBufferSize). bytesSent := bytesSent + count].
^ bytesSent
!
Item was changed: ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in category 'waiting') ----- waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused: refusedBlock "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
| deadline timeLeft status |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
(status := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [^true].
[ (status == WaitingForConnection) and: [ (timeLeft := deadline -
Time millisecondClockValue) > 0 ] ]
| startTime msecsDelta msecsEllapsed status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
status := self primSocketConnectionStatus: socketHandle.
status = Connected ifTrue: [^true].
[(status = WaitingForConnection) and: [(msecsEllapsed := Time
millisecondsSince: startTime) < msecsDelta]] whileTrue: [
semaphore waitTimeoutMSecs: timeLeft.
status := self primSocketConnectionStatus:
socketHandle ].
status == Connected ifTrue: [ ^true ].
status == WaitingForConnection
ifTrue: [ timeoutBlock value ]
ifFalse: [ refusedBlock value ].
^false!
semaphore waitTimeoutMSecs: msecsDelta -
msecsEllapsed.
status := self primSocketConnectionStatus:
socketHandle].
status = Connected
ifFalse: [
status = WaitingForConnection
ifTrue: [timeoutBlock value]
ifFalse: [refusedBlock value].
^false].
^ true!
Item was changed:
----- Method: Socket>>waitForConnectionUntil: (in category 'waiting')
waitForConnectionUntil: deadline "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
| status timeLeft |
| status waitTime | [ (status := self primSocketConnectionStatus: socketHandle)
== Connected ifTrue: [ ^true ]. status == WaitingForConnection ifFalse: [ ^false ].
(timeLeft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
semaphore waitTimeoutMSecs: timeLeft ] repeat!
(waitTime := deadline - Time millisecondClockValue) > 0
ifFalse: [ ^false ].
semaphore waitTimeoutMSecs: waitTime ] repeat!
Item was changed: ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category 'waiting') ----- waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock "Wait for the given nr of seconds for data to arrive."
| deadline timeLeft |
| startTime msecsDelta | socketHandle ifNil: [ ^closedBlock value ].
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated. [ (self primSocketReceiveDataAvailable: socketHandle)
ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value ].
(timeLeft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^timedOutBlock value ].
self isConnected ifFalse: [ ^closedBlock value ].
(Time millisecondsSince: startTime) < msecsDelta ifFalse:
[ ^timedOutBlock value ]. "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed." readSemaphore waitTimeoutMSecs:
(timeLeft min: self class
maximumReadSemaphoreWaitTimeout) ] repeat!
(msecsDelta - (Time millisecondsSince: startTime)
min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
Item was changed: ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting') ----- waitForDataIfClosed: closedBlock "Wait indefinitely for data to arrive. This method will block until data is available or the socket is closed."
socketHandle ifNil: [ ^closedBlock value ]. [ (self primSocketReceiveDataAvailable: socketHandle)
ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value ].
self isConnected ifFalse: [ ^closedBlock value ]. "ul 8/13/2014 21:16 Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Replace the ""waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout"" part with ""wait"" when the bug is fixed." readSemaphore waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout ] repeat!
Item was changed:
----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')
waitForDisconnectionFor: timeout "Wait for the given nr of seconds for the connection to be broken. Return true if it is broken by the deadline, false if not. The client should know the connection is really going to be closed (e.g., because he has called 'close' to send a close request to the other end) before calling this method."
| deadline |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
[ self isOtherEndConnected and: [ deadline - Time
millisecondClockValue > 0 ] ]
whileTrue: [
self discardReceivedData.
"Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(deadline - Time millisecondClockValue
min: self class maximumReadSemaphoreWaitTimeout) ].
^self isOtherEndConnected!
| startTime msecsDelta status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
status := self primSocketConnectionStatus: socketHandle.
[((status == Connected) or: [(status == ThisEndClosed)]) and:
[(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
self discardReceivedData.
"Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(msecsDelta - (Time millisecondsSince: startTime)
min: self class maximumReadSemaphoreWaitTimeout).
status := self primSocketConnectionStatus: socketHandle].
^ status ~= Connected!
Item was changed: ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') ----- waitForSendDoneFor: timeout "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
| deadline timeleft |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
| startTime msecsDelta msecsEllapsed |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated. [ (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
self isThisEndConnected ifFalse: [ ^false ].
(timeleft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
self isConnected ifFalse: [ ^false ].
(msecsEllapsed := Time millisecondsSince: startTime) <
msecsDelta ifFalse: [ ^false ].
writeSemaphore waitTimeoutMSecs: msecsDelta -
msecsEllapsed ] repeat!
Hi Chris,
I decide to move this to the Trunk because the feature freeze is here. This should also help getting more feedback. :)
Levente
On Mon, 25 Jul 2016, Chris Muller wrote:
Hi Levente, Magma's test cases can't seem to get to the end with this.. I haven't investigated yet..
On Mon, Jul 25, 2016 at 2:40 PM, Levente Uzonyi leves@caesar.elte.hu wrote:
Hi All,
This is something to test on all platforms before push. I've used it on 64-bit linux without problems so far.
Levente
On Mon, 25 Jul 2016, commits@source.squeak.org wrote:
Levente Uzonyi uploaded a new version of Network to project The Inbox: http://source.squeak.org/inbox/Network-ul.180.mcz
==================== Summary ====================
Name: Network-ul.180 Author: ul Time: 25 July 2016, 8:40:01.001452 pm UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95 Ancestors: Network-nice.179
Socket changes:
- fixed the comment of #isOtherEndConnected and #isThisEndConnected
- do not slice the data (TCP) in the image in #sendData:. Let the VM, the
kernel, the hardware deal with that.
- use #isOtherEndConnected when receiving data, and #isThisEndConnected
when sending data instead of #isConnected
- move away from #milliseconds:since:, since we have a clock that won't
roll over
=============== Diff against Network-nice.179 ===============
Item was changed: ----- Method: Socket>>closeAndDestroy: (in category 'connection open/close') ----- closeAndDestroy: timeoutSeconds "First, try to close this connection gracefully. If the close attempt fails or times out, abort the connection. In either case, destroy the socket. Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."
socketHandle ifNil: [ ^self ].
self isThisEndConnected ifTrue: [
self close. "Close this end." ].
(self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
"The other end has not closed the connect yet, so we will
just abort it."
self primSocketAbortConnection: socketHandle ].
self destroy!
socketHandle ifNotNil: [
self isConnected ifTrue: [
self close. "close this end"
(self waitForDisconnectionFor:
timeoutSeconds) ifFalse: [
"The other end didn't
close so we just abort the connection"
self
primSocketAbortConnection: socketHandle]].
self destroy].
- !
Item was changed: ----- Method: Socket>>discardReceivedData (in category 'receiving') ----- discardReceivedData "Discard any data received up until now, and return the number of bytes discarded."
| buf totalBytesDiscarded | buf := String new: 10000. totalBytesDiscarded := 0.
[self isOtherEndConnected and: [self dataAvailable]] whileTrue: [
[self isConnected and: [self dataAvailable]] whileTrue: [ totalBytesDiscarded := totalBytesDiscarded + (self receiveDataInto:
buf)]. ^ totalBytesDiscarded !
Item was changed: ----- Method: Socket>>isOtherEndConnected (in category 'queries') ----- isOtherEndConnected
"Return true if this socket is connected, or this end has closed
the connection but not the other end, so we can still receive data."
"Return true if this socket is connected, or this end has closed
the connection but not the other end, so we can still send data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [ ^true ]. ^state == ThisEndClosed !
Item was changed: ----- Method: Socket>>isThisEndConnected (in category 'queries') ----- isThisEndConnected
"Return true if this socket is connected, other the other end has
closed the connection but not this end, so we can still send data."
"Return true if this socket is connected, other the other end has
closed the connection but not this end, so we can still receive data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [ ^true ]. ^state == OtherEndClosed !
Item was changed: ----- Method: Socket>>sendData: (in category 'sending') ----- sendData: aStringOrByteArray "Send all of the data in the given array, even if it requires multiple calls to send it all. Return the number of bytes sent."
"An experimental version use on slow lines: Longer timeout and
smaller writes to try to avoid spurious timeouts."
| bytesSent bytesToSend count | bytesToSend := aStringOrByteArray size. bytesSent := 0. [bytesSent < bytesToSend] whileTrue: [ (self waitForSendDoneFor: 60) ifFalse: [ConnectionTimedOut signal: 'send data
timeout; data not sent']. count := self primSocket: socketHandle sendData: aStringOrByteArray startIndex: bytesSent + 1
count: bytesToSend - bytesSent.
count: (bytesToSend - bytesSent min:
DefaultSendBufferSize). bytesSent := bytesSent + count].
^ bytesSent
!
Item was changed: ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in category 'waiting') ----- waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused: refusedBlock "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
| deadline timeLeft status |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
(status := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [^true].
[ (status == WaitingForConnection) and: [ (timeLeft := deadline -
Time millisecondClockValue) > 0 ] ]
| startTime msecsDelta msecsEllapsed status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
status := self primSocketConnectionStatus: socketHandle.
status = Connected ifTrue: [^true].
[(status = WaitingForConnection) and: [(msecsEllapsed := Time
millisecondsSince: startTime) < msecsDelta]] whileTrue: [
semaphore waitTimeoutMSecs: timeLeft.
status := self primSocketConnectionStatus:
socketHandle ].
status == Connected ifTrue: [ ^true ].
status == WaitingForConnection
ifTrue: [ timeoutBlock value ]
ifFalse: [ refusedBlock value ].
^false!
semaphore waitTimeoutMSecs: msecsDelta -
msecsEllapsed.
status := self primSocketConnectionStatus:
socketHandle].
status = Connected
ifFalse: [
status = WaitingForConnection
ifTrue: [timeoutBlock value]
ifFalse: [refusedBlock value].
^false].
^ true!
Item was changed:
----- Method: Socket>>waitForConnectionUntil: (in category 'waiting')
waitForConnectionUntil: deadline "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
| status timeLeft |
| status waitTime | [ (status := self primSocketConnectionStatus: socketHandle)
== Connected ifTrue: [ ^true ]. status == WaitingForConnection ifFalse: [ ^false ].
(timeLeft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
semaphore waitTimeoutMSecs: timeLeft ] repeat!
(waitTime := deadline - Time millisecondClockValue) > 0
ifFalse: [ ^false ].
semaphore waitTimeoutMSecs: waitTime ] repeat!
Item was changed: ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category 'waiting') ----- waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock "Wait for the given nr of seconds for data to arrive."
| deadline timeLeft |
| startTime msecsDelta | socketHandle ifNil: [ ^closedBlock value ].
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated. [ (self primSocketReceiveDataAvailable: socketHandle)
ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value ].
(timeLeft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^timedOutBlock value ].
self isConnected ifFalse: [ ^closedBlock value ].
(Time millisecondsSince: startTime) < msecsDelta ifFalse:
[ ^timedOutBlock value ]. "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed." readSemaphore waitTimeoutMSecs:
(timeLeft min: self class
maximumReadSemaphoreWaitTimeout) ] repeat!
(msecsDelta - (Time millisecondsSince: startTime)
min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
Item was changed: ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting') ----- waitForDataIfClosed: closedBlock "Wait indefinitely for data to arrive. This method will block until data is available or the socket is closed."
socketHandle ifNil: [ ^closedBlock value ]. [ (self primSocketReceiveDataAvailable: socketHandle)
ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value ].
self isConnected ifFalse: [ ^closedBlock value ]. "ul 8/13/2014 21:16 Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Replace the ""waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout"" part with ""wait"" when the bug is fixed." readSemaphore waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout ] repeat!
Item was changed:
----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')
waitForDisconnectionFor: timeout "Wait for the given nr of seconds for the connection to be broken. Return true if it is broken by the deadline, false if not. The client should know the connection is really going to be closed (e.g., because he has called 'close' to send a close request to the other end) before calling this method."
| deadline |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
[ self isOtherEndConnected and: [ deadline - Time
millisecondClockValue > 0 ] ]
whileTrue: [
self discardReceivedData.
"Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(deadline - Time millisecondClockValue
min: self class maximumReadSemaphoreWaitTimeout) ].
^self isOtherEndConnected!
| startTime msecsDelta status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
status := self primSocketConnectionStatus: socketHandle.
[((status == Connected) or: [(status == ThisEndClosed)]) and:
[(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
self discardReceivedData.
"Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(msecsDelta - (Time millisecondsSince: startTime)
min: self class maximumReadSemaphoreWaitTimeout).
status := self primSocketConnectionStatus: socketHandle].
^ status ~= Connected!
Item was changed: ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') ----- waitForSendDoneFor: timeout "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
| deadline timeleft |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
| startTime msecsDelta msecsEllapsed |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated. [ (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
self isThisEndConnected ifFalse: [ ^false ].
(timeleft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
self isConnected ifFalse: [ ^false ].
(msecsEllapsed := Time millisecondsSince: startTime) <
msecsDelta ifFalse: [ ^false ].
writeSemaphore waitTimeoutMSecs: msecsDelta -
msecsEllapsed ] repeat!
Hi Levente, I've narrowed Magma's problem with these changes down to one line in Socket>>#waitForSendDoneFor:. If I revert the call to #isThisEndConnected back to #isConnected, Magma's test suite works fine.
waitForSendDoneFor: timeout "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
| deadline timeleft | deadline := Time millisecondClockValue + (timeout * 1000) truncated. [ (self primSocketSendDone: socketHandle) ifTrue: [ ^true ]. self isThisEndConnected ifFalse: [ ^false ]. "<---- want to go back to #isConnected for this line" (timeleft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^false ]. writeSemaphore waitTimeoutMSecs: timeleft ] repeat
It affects Magma's High-Availability test cases, which involve terminating the server while its busy serving clients. For each release of Squeak, I put out a new release of Magma that works OOTB. I would very much like to be able to do that this time too, without confusion of a changeset / modified system for Magma users. Would you be willing to revert this one line to buy us time to explore this change together under greater scrutiny for the duration of the next release of Squeak, instead of this release?
Best, Chris
On Sun, Jul 31, 2016 at 8:39 PM, Levente Uzonyi leves@caesar.elte.hu wrote:
Hi Chris,
I decide to move this to the Trunk because the feature freeze is here. This should also help getting more feedback. :)
Levente
On Mon, 25 Jul 2016, Chris Muller wrote:
Hi Levente, Magma's test cases can't seem to get to the end with this.. I haven't investigated yet..
On Mon, Jul 25, 2016 at 2:40 PM, Levente Uzonyi leves@caesar.elte.hu wrote:
Hi All,
This is something to test on all platforms before push. I've used it on 64-bit linux without problems so far.
Levente
On Mon, 25 Jul 2016, commits@source.squeak.org wrote:
Levente Uzonyi uploaded a new version of Network to project The Inbox: http://source.squeak.org/inbox/Network-ul.180.mcz
==================== Summary ====================
Name: Network-ul.180 Author: ul Time: 25 July 2016, 8:40:01.001452 pm UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95 Ancestors: Network-nice.179
Socket changes:
- fixed the comment of #isOtherEndConnected and #isThisEndConnected
- do not slice the data (TCP) in the image in #sendData:. Let the VM,
the kernel, the hardware deal with that.
- use #isOtherEndConnected when receiving data, and #isThisEndConnected
when sending data instead of #isConnected
- move away from #milliseconds:since:, since we have a clock that won't
roll over
=============== Diff against Network-nice.179 ===============
Item was changed: ----- Method: Socket>>closeAndDestroy: (in category 'connection open/close') ----- closeAndDestroy: timeoutSeconds "First, try to close this connection gracefully. If the close attempt fails or times out, abort the connection. In either case, destroy the socket. Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."
socketHandle ifNil: [ ^self ].
self isThisEndConnected ifTrue: [
self close. "Close this end." ].
(self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
"The other end has not closed the connect yet, so we
will just abort it."
self primSocketAbortConnection: socketHandle ].
self destroy!
socketHandle ifNotNil: [
self isConnected ifTrue: [
self close. "close this end"
(self waitForDisconnectionFor:
timeoutSeconds) ifFalse: [
"The other end didn't
close so we just abort the connection"
self
primSocketAbortConnection: socketHandle]].
self destroy].
- !
Item was changed:
----- Method: Socket>>discardReceivedData (in category 'receiving')
discardReceivedData "Discard any data received up until now, and return the number of bytes discarded."
| buf totalBytesDiscarded | buf := String new: 10000. totalBytesDiscarded := 0.
[self isOtherEndConnected and: [self dataAvailable]] whileTrue:
[
[self isConnected and: [self dataAvailable]] whileTrue: [ totalBytesDiscarded := totalBytesDiscarded + (self receiveDataInto:
buf)]. ^ totalBytesDiscarded !
Item was changed: ----- Method: Socket>>isOtherEndConnected (in category 'queries') ----- isOtherEndConnected
"Return true if this socket is connected, or this end has closed
the connection but not the other end, so we can still receive data."
"Return true if this socket is connected, or this end has closed
the connection but not the other end, so we can still send data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [ ^true ]. ^state == ThisEndClosed !
Item was changed: ----- Method: Socket>>isThisEndConnected (in category 'queries') ----- isThisEndConnected
"Return true if this socket is connected, other the other end
has closed the connection but not this end, so we can still send data."
"Return true if this socket is connected, other the other end
has closed the connection but not this end, so we can still receive data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [ ^true ]. ^state == OtherEndClosed !
Item was changed: ----- Method: Socket>>sendData: (in category 'sending') ----- sendData: aStringOrByteArray "Send all of the data in the given array, even if it requires multiple calls to send it all. Return the number of bytes sent."
"An experimental version use on slow lines: Longer timeout and
smaller writes to try to avoid spurious timeouts."
| bytesSent bytesToSend count | bytesToSend := aStringOrByteArray size. bytesSent := 0. [bytesSent < bytesToSend] whileTrue: [ (self waitForSendDoneFor: 60) ifFalse: [ConnectionTimedOut signal: 'send data
timeout; data not sent']. count := self primSocket: socketHandle sendData: aStringOrByteArray startIndex: bytesSent + 1
count: bytesToSend - bytesSent.
count: (bytesToSend - bytesSent min:
DefaultSendBufferSize). bytesSent := bytesSent + count].
^ bytesSent
!
Item was changed: ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in category 'waiting') ----- waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused: refusedBlock "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
| deadline timeLeft status |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
(status := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [^true].
[ (status == WaitingForConnection) and: [ (timeLeft := deadline
Time millisecondClockValue) > 0 ] ]
| startTime msecsDelta msecsEllapsed status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
status := self primSocketConnectionStatus: socketHandle.
status = Connected ifTrue: [^true].
[(status = WaitingForConnection) and: [(msecsEllapsed := Time
millisecondsSince: startTime) < msecsDelta]] whileTrue: [
semaphore waitTimeoutMSecs: timeLeft.
status := self primSocketConnectionStatus:
socketHandle ].
status == Connected ifTrue: [ ^true ].
status == WaitingForConnection
ifTrue: [ timeoutBlock value ]
ifFalse: [ refusedBlock value ].
^false!
semaphore waitTimeoutMSecs: msecsDelta -
msecsEllapsed.
status := self primSocketConnectionStatus:
socketHandle].
status = Connected
ifFalse: [
status = WaitingForConnection
ifTrue: [timeoutBlock value]
ifFalse: [refusedBlock value].
^false].
^ true!
Item was changed:
----- Method: Socket>>waitForConnectionUntil: (in category 'waiting')
waitForConnectionUntil: deadline "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
| status timeLeft |
| status waitTime | [ (status := self primSocketConnectionStatus:
socketHandle) == Connected ifTrue: [ ^true ]. status == WaitingForConnection ifFalse: [ ^false ].
(timeLeft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
semaphore waitTimeoutMSecs: timeLeft ] repeat!
(waitTime := deadline - Time millisecondClockValue) > 0
ifFalse: [ ^false ].
semaphore waitTimeoutMSecs: waitTime ] repeat!
Item was changed: ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category 'waiting') ----- waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock "Wait for the given nr of seconds for data to arrive."
| deadline timeLeft |
| startTime msecsDelta | socketHandle ifNil: [ ^closedBlock value ].
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated. [ (self primSocketReceiveDataAvailable: socketHandle)
ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value
].
(timeLeft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^timedOutBlock value ].
self isConnected ifFalse: [ ^closedBlock value ].
(Time millisecondsSince: startTime) < msecsDelta
ifFalse: [ ^timedOutBlock value ]. "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed." readSemaphore waitTimeoutMSecs:
(timeLeft min: self class
maximumReadSemaphoreWaitTimeout) ] repeat!
(msecsDelta - (Time millisecondsSince:
startTime) min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
Item was changed:
----- Method: Socket>>waitForDataIfClosed: (in category 'waiting')
waitForDataIfClosed: closedBlock "Wait indefinitely for data to arrive. This method will block until data is available or the socket is closed."
socketHandle ifNil: [ ^closedBlock value ]. [ (self primSocketReceiveDataAvailable: socketHandle)
ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value
].
self isConnected ifFalse: [ ^closedBlock value ]. "ul 8/13/2014 21:16 Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Replace the ""waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout"" part with ""wait"" when the bug is fixed." readSemaphore waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout ] repeat!
Item was changed:
----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')
waitForDisconnectionFor: timeout "Wait for the given nr of seconds for the connection to be broken. Return true if it is broken by the deadline, false if not. The client should know the connection is really going to be closed (e.g., because he has called 'close' to send a close request to the other end) before calling this method."
| deadline |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
[ self isOtherEndConnected and: [ deadline - Time
millisecondClockValue > 0 ] ]
whileTrue: [
self discardReceivedData.
"Providing a maximum for the time for waiting is
a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(deadline - Time millisecondClockValue
min: self class maximumReadSemaphoreWaitTimeout) ].
^self isOtherEndConnected!
| startTime msecsDelta status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
status := self primSocketConnectionStatus: socketHandle.
[((status == Connected) or: [(status == ThisEndClosed)]) and:
[(Time millisecondsSince: startTime) < msecsDelta]] whileTrue:
[
self discardReceivedData.
"Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(msecsDelta - (Time millisecondsSince:
startTime) min: self class maximumReadSemaphoreWaitTimeout).
status := self primSocketConnectionStatus:
socketHandle].
^ status ~= Connected!
Item was changed: ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') ----- waitForSendDoneFor: timeout "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
| deadline timeleft |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
| startTime msecsDelta msecsEllapsed |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated. [ (self primSocketSendDone: socketHandle) ifTrue: [ ^true
].
self isThisEndConnected ifFalse: [ ^false ].
(timeleft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
self isConnected ifFalse: [ ^false ].
(msecsEllapsed := Time millisecondsSince: startTime) <
msecsDelta ifFalse: [ ^false ].
writeSemaphore waitTimeoutMSecs: msecsDelta -
msecsEllapsed ] repeat!
Hi Chris,
Thanks for tracking this down. I'll write some tests to see if the VM actually supports half-closed connections properly when I have time. If not, I'll revert this change before the release.
Levente
On Tue, 2 Aug 2016, Chris Muller wrote:
Hi Levente, I've narrowed Magma's problem with these changes down to one line in Socket>>#waitForSendDoneFor:. If I revert the call to #isThisEndConnected back to #isConnected, Magma's test suite works fine.
waitForSendDoneFor: timeout "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
| deadline timeleft | deadline := Time millisecondClockValue + (timeout * 1000) truncated. [ (self primSocketSendDone: socketHandle) ifTrue: [ ^true ]. self isThisEndConnected ifFalse: [ ^false ]. "<---- want to go back to #isConnected for this line" (timeleft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^false ]. writeSemaphore waitTimeoutMSecs: timeleft ] repeat
It affects Magma's High-Availability test cases, which involve terminating the server while its busy serving clients. For each release of Squeak, I put out a new release of Magma that works OOTB. I would very much like to be able to do that this time too, without confusion of a changeset / modified system for Magma users. Would you be willing to revert this one line to buy us time to explore this change together under greater scrutiny for the duration of the next release of Squeak, instead of this release?
Best, Chris
On Sun, Jul 31, 2016 at 8:39 PM, Levente Uzonyi leves@caesar.elte.hu wrote:
Hi Chris,
I decide to move this to the Trunk because the feature freeze is here. This should also help getting more feedback. :)
Levente
On Mon, 25 Jul 2016, Chris Muller wrote:
Hi Levente, Magma's test cases can't seem to get to the end with this.. I haven't investigated yet..
On Mon, Jul 25, 2016 at 2:40 PM, Levente Uzonyi leves@caesar.elte.hu wrote:
Hi All,
This is something to test on all platforms before push. I've used it on 64-bit linux without problems so far.
Levente
On Mon, 25 Jul 2016, commits@source.squeak.org wrote:
Levente Uzonyi uploaded a new version of Network to project The Inbox: http://source.squeak.org/inbox/Network-ul.180.mcz
==================== Summary ====================
Name: Network-ul.180 Author: ul Time: 25 July 2016, 8:40:01.001452 pm UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95 Ancestors: Network-nice.179
Socket changes:
- fixed the comment of #isOtherEndConnected and #isThisEndConnected
- do not slice the data (TCP) in the image in #sendData:. Let the VM,
the kernel, the hardware deal with that.
- use #isOtherEndConnected when receiving data, and #isThisEndConnected
when sending data instead of #isConnected
- move away from #milliseconds:since:, since we have a clock that won't
roll over
=============== Diff against Network-nice.179 ===============
Item was changed: ----- Method: Socket>>closeAndDestroy: (in category 'connection open/close') ----- closeAndDestroy: timeoutSeconds "First, try to close this connection gracefully. If the close attempt fails or times out, abort the connection. In either case, destroy the socket. Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."
socketHandle ifNil: [ ^self ].
self isThisEndConnected ifTrue: [
self close. "Close this end." ].
(self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
"The other end has not closed the connect yet, so we
will just abort it."
self primSocketAbortConnection: socketHandle ].
self destroy!
socketHandle ifNotNil: [
self isConnected ifTrue: [
self close. "close this end"
(self waitForDisconnectionFor:
timeoutSeconds) ifFalse: [
"The other end didn't
close so we just abort the connection"
self
primSocketAbortConnection: socketHandle]].
self destroy].
- !
Item was changed:
----- Method: Socket>>discardReceivedData (in category 'receiving')
discardReceivedData "Discard any data received up until now, and return the number of bytes discarded."
| buf totalBytesDiscarded | buf := String new: 10000. totalBytesDiscarded := 0.
[self isOtherEndConnected and: [self dataAvailable]] whileTrue:
[
[self isConnected and: [self dataAvailable]] whileTrue: [ totalBytesDiscarded := totalBytesDiscarded + (self receiveDataInto:
buf)]. ^ totalBytesDiscarded !
Item was changed: ----- Method: Socket>>isOtherEndConnected (in category 'queries') ----- isOtherEndConnected
"Return true if this socket is connected, or this end has closed
the connection but not the other end, so we can still receive data."
"Return true if this socket is connected, or this end has closed
the connection but not the other end, so we can still send data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [ ^true ]. ^state == ThisEndClosed !
Item was changed: ----- Method: Socket>>isThisEndConnected (in category 'queries') ----- isThisEndConnected
"Return true if this socket is connected, other the other end
has closed the connection but not this end, so we can still send data."
"Return true if this socket is connected, other the other end
has closed the connection but not this end, so we can still receive data."
| state | socketHandle ifNil: [ ^false ]. (state := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [ ^true ]. ^state == OtherEndClosed !
Item was changed: ----- Method: Socket>>sendData: (in category 'sending') ----- sendData: aStringOrByteArray "Send all of the data in the given array, even if it requires multiple calls to send it all. Return the number of bytes sent."
"An experimental version use on slow lines: Longer timeout and
smaller writes to try to avoid spurious timeouts."
| bytesSent bytesToSend count | bytesToSend := aStringOrByteArray size. bytesSent := 0. [bytesSent < bytesToSend] whileTrue: [ (self waitForSendDoneFor: 60) ifFalse: [ConnectionTimedOut signal: 'send data
timeout; data not sent']. count := self primSocket: socketHandle sendData: aStringOrByteArray startIndex: bytesSent + 1
count: bytesToSend - bytesSent.
count: (bytesToSend - bytesSent min:
DefaultSendBufferSize). bytesSent := bytesSent + count].
^ bytesSent
!
Item was changed: ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in category 'waiting') ----- waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused: refusedBlock "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
| deadline timeLeft status |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
(status := self primSocketConnectionStatus: socketHandle) ==
Connected ifTrue: [^true].
[ (status == WaitingForConnection) and: [ (timeLeft := deadline
Time millisecondClockValue) > 0 ] ]
| startTime msecsDelta msecsEllapsed status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
status := self primSocketConnectionStatus: socketHandle.
status = Connected ifTrue: [^true].
[(status = WaitingForConnection) and: [(msecsEllapsed := Time
millisecondsSince: startTime) < msecsDelta]] whileTrue: [
semaphore waitTimeoutMSecs: timeLeft.
status := self primSocketConnectionStatus:
socketHandle ].
status == Connected ifTrue: [ ^true ].
status == WaitingForConnection
ifTrue: [ timeoutBlock value ]
ifFalse: [ refusedBlock value ].
^false!
semaphore waitTimeoutMSecs: msecsDelta -
msecsEllapsed.
status := self primSocketConnectionStatus:
socketHandle].
status = Connected
ifFalse: [
status = WaitingForConnection
ifTrue: [timeoutBlock value]
ifFalse: [refusedBlock value].
^false].
^ true!
Item was changed:
----- Method: Socket>>waitForConnectionUntil: (in category 'waiting')
waitForConnectionUntil: deadline "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
| status timeLeft |
| status waitTime | [ (status := self primSocketConnectionStatus:
socketHandle) == Connected ifTrue: [ ^true ]. status == WaitingForConnection ifFalse: [ ^false ].
(timeLeft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
semaphore waitTimeoutMSecs: timeLeft ] repeat!
(waitTime := deadline - Time millisecondClockValue) > 0
ifFalse: [ ^false ].
semaphore waitTimeoutMSecs: waitTime ] repeat!
Item was changed: ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category 'waiting') ----- waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock "Wait for the given nr of seconds for data to arrive."
| deadline timeLeft |
| startTime msecsDelta | socketHandle ifNil: [ ^closedBlock value ].
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated. [ (self primSocketReceiveDataAvailable: socketHandle)
ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value
].
(timeLeft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^timedOutBlock value ].
self isConnected ifFalse: [ ^closedBlock value ].
(Time millisecondsSince: startTime) < msecsDelta
ifFalse: [ ^timedOutBlock value ]. "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed." readSemaphore waitTimeoutMSecs:
(timeLeft min: self class
maximumReadSemaphoreWaitTimeout) ] repeat!
(msecsDelta - (Time millisecondsSince:
startTime) min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
Item was changed:
----- Method: Socket>>waitForDataIfClosed: (in category 'waiting')
waitForDataIfClosed: closedBlock "Wait indefinitely for data to arrive. This method will block until data is available or the socket is closed."
socketHandle ifNil: [ ^closedBlock value ]. [ (self primSocketReceiveDataAvailable: socketHandle)
ifTrue: [ ^self ].
self isOtherEndConnected ifFalse: [ ^closedBlock value
].
self isConnected ifFalse: [ ^closedBlock value ]. "ul 8/13/2014 21:16 Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Replace the ""waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout"" part with ""wait"" when the bug is fixed." readSemaphore waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout ] repeat!
Item was changed:
----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')
waitForDisconnectionFor: timeout "Wait for the given nr of seconds for the connection to be broken. Return true if it is broken by the deadline, false if not. The client should know the connection is really going to be closed (e.g., because he has called 'close' to send a close request to the other end) before calling this method."
| deadline |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
[ self isOtherEndConnected and: [ deadline - Time
millisecondClockValue > 0 ] ]
whileTrue: [
self discardReceivedData.
"Providing a maximum for the time for waiting is
a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(deadline - Time millisecondClockValue
min: self class maximumReadSemaphoreWaitTimeout) ].
^self isOtherEndConnected!
| startTime msecsDelta status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
status := self primSocketConnectionStatus: socketHandle.
[((status == Connected) or: [(status == ThisEndClosed)]) and:
[(Time millisecondsSince: startTime) < msecsDelta]] whileTrue:
[
self discardReceivedData.
"Providing a maximum for the time for waiting is a
workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
readSemaphore waitTimeoutMSecs:
(msecsDelta - (Time millisecondsSince:
startTime) min: self class maximumReadSemaphoreWaitTimeout).
status := self primSocketConnectionStatus:
socketHandle].
^ status ~= Connected!
Item was changed: ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') ----- waitForSendDoneFor: timeout "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
| deadline timeleft |
deadline := Time millisecondClockValue + (timeout * 1000)
truncated.
| startTime msecsDelta msecsEllapsed |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated. [ (self primSocketSendDone: socketHandle) ifTrue: [ ^true
].
self isThisEndConnected ifFalse: [ ^false ].
(timeleft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
self isConnected ifFalse: [ ^false ].
(msecsEllapsed := Time millisecondsSince: startTime) <
msecsDelta ifFalse: [ ^false ].
writeSemaphore waitTimeoutMSecs: msecsDelta -
msecsEllapsed ] repeat!
squeak-dev@lists.squeakfoundation.org